You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/28 18:26:35 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

stevenzwu opened a new pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886


   …UI can show different names when there are multiple Iceberg sinks for a Flink job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679759141



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws Exception {
       Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
     }
   }
+
+  @Test
+  public void testTwoSinksInDisjointedDAG() throws Exception {
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    final String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");

Review comment:
       Unit tests looks good to me.  One minor thing:  in apache iceberg codebase,   I have almost never seen a situation where local variables are declared as final, @rdblue , Is this an implicit code style rule?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu edited a comment on pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu edited a comment on pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#issuecomment-889553074


   @openinx I addressed your comments in the last commit. it is a good call to add a unit test. Through the unit test, I also found that uid wasn't set the map operator. Latest commit addressed it. Because I don't want to change the `Builder` class to a generic type, I added a `mappedRowDataInput` variable to distinguish this extra mapper case so that we can add the uid and name in the `build()` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682548625



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       Yeah,  this is needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679097460



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);

Review comment:
       The [unchecked](https://github.com/apache/iceberg/pull/2886/files#diff-a26cb8b3f0f5b0e6a9f110bb0b3fdd0ed625bfc9faf27e248f6446818cad4d69L234) warning can be removed now ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679095395



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,21 +209,23 @@ public Builder equalityFieldColumns(List<String> columns) {
 
     /**
      * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators (like
-     * writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like "uid-writer".
-     * <p>
+     * writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like "uidPrefix-writer".
+     * <br><br>
+     * If provided, this prefix is also applied to operator names.
+     * <br><br>
      * Flink auto generates operator uids if not set explicitly. It is a recommended
      * <a href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
      * best-practice to set uid for all operators</a> before deploying to production. Flink has an option to {@code
      * pipeline.auto-generate-uids=false} to disable auto-generation and force explicit setting of all operator uids.
-     * <p>
+     * <br><br>
      * Be careful with setting this for an existing job, because now we are changing the opeartor uid from an

Review comment:
       `opeartor` ?  typo ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682999785



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws Exception {
       Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
     }
   }
+
+  @Test
+  public void testTwoSinksInDisjointedDAG() throws Exception {
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    final String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");

Review comment:
       done. I removed all the final modifiers for local vars in this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r683000689



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       great idea. switched to this lazy ini mode

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r680086367



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       thx. will fix (along with any other changes if needed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r680084494



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +254,83 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
+      // set name and uid for mappedRowDataInput if needed
+      if (mappedRowDataInput != null) {
+        if (uidPrefix != null) {
+          rowDataInput = mappedRowDataInput
+              .name(operatorName(uidPrefix))
+              .uid(uidPrefix + "-mapper");
+        } else {
+          rowDataInput = mappedRowDataInput;
         }
       }
 
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
-      rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+      DataStream<RowData> distributeStream = distributeDataStream(
+          rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);

Review comment:
       `keyBy` returns a `KeyedStream` which extends from `DataStream` that doesn't expose `name` and `uid`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#issuecomment-893181774


   @openinx please take another look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679741972



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       Nit: missing a code format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679365088



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }
 
+    private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+      final IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
       SingleOutputStreamOperator<Void> committerStream = writerStream
-          .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+          .transform(getOperatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
       if (uidPrefix != null) {
         committerStream = committerStream.uid(uidPrefix + "-committer");
       }
+      return committerStream;
+    }
 
-      DataStreamSink<RowData> resultStream = committerStream
-          .addSink(new DiscardingSink())
-          .name(String.format("IcebergSink %s", table.name()))
-          .setParallelism(1);
+    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {

Review comment:
       we are still reference to the stream after `distributeDataStream`. still using the old style of mutable `rowDataInput`. I definitely see that it is easier to get confused. let me update the code to make it more consistent and clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#issuecomment-889553074


   @openinx I addressed your comments in the last commit. it is a good call to add a unit test. Through the unit test, I also found that uid wasn't set the map operator. Latest commit addressed it. Because I don't want to change the `Builder` class to a generic type. I added a `mappedRowDataInput` variable to distinguish this case so that we can add the uid and name in the `build()` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679100142



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }
 
+    private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+      final IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
       SingleOutputStreamOperator<Void> committerStream = writerStream
-          .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+          .transform(getOperatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
       if (uidPrefix != null) {
         committerStream = committerStream.uid(uidPrefix + "-committer");
       }
+      return committerStream;
+    }
 
-      DataStreamSink<RowData> resultStream = committerStream
-          .addSink(new DiscardingSink())
-          .name(String.format("IcebergSink %s", table.name()))
-          .setParallelism(1);
+    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {

Review comment:
       Shouldn't it break any unit test that address [distributeDataStream](https://github.com/apache/iceberg/pull/2886/files#diff-a26cb8b3f0f5b0e6a9f110bb0b3fdd0ed625bfc9faf27e248f6446818cad4d69R255) logic  ?  I see we did not use the `input` data stream  when chain the iceberg writers into the stream..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679745954



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       I'd prefer not introducing another `DataStream<RowData>` named `mappedRowDataInput` into this Builder,  because that could easily confuse people:  which `DataStream` should I use to chain the following operators ?  Let's just keep the only one `rowDataInput` even if we've chained a `MapFunction`,  we could still add the `uid` and `name` lazily to the `Map` operator ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682548420



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       I tried to initialize the input stream lazily as the following because I think it could make the source `rowDataInput` stream very clear, it's the only one stream that we will need to chain the following operator.
   
   ```diff
   diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   index bda6667cd..a6be26b31 100644
   --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   @@ -24,6 +24,7 @@ import java.io.UncheckedIOException;
    import java.util.List;
    import java.util.Locale;
    import java.util.Map;
   +import java.util.function.Function;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
   @@ -117,7 +118,7 @@ public class FlinkSink {
    
      public static class Builder {
        private DataStream<RowData> rowDataInput = null;
   -    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;
   +    private Function<String, DataStream<RowData>> inputCreator = null;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
   @@ -136,9 +137,17 @@ public class FlinkSink {
        }
    
        private <T> Builder forMapperOutputType(DataStream<T> input,
   -                                        MapFunction<T, RowData> mapper,
   -                                        TypeInformation<RowData> outputType) {
   -      this.mappedRowDataInput = input.map(mapper, outputType);
   +                                            MapFunction<T, RowData> mapper,
   +                                            TypeInformation<RowData> outputType) {
   +      this.inputCreator = newUidPrefix -> {
   +        if (newUidPrefix != null) {
   +          return input.map(mapper, outputType)
   +              .name(operatorName(newUidPrefix))
   +              .uid(newUidPrefix + "-mapper");
   +        } else {
   +          return input.map(mapper, outputType);
   +        }
   +      };
          return this;
        }
    
   @@ -241,8 +250,11 @@ public class FlinkSink {
        }
    
        public DataStreamSink<RowData> build() {
   -      Preconditions.checkArgument(rowDataInput != null || mappedRowDataInput != null,
   -          "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
   +      Preconditions.checkArgument(rowDataInput == null || inputCreator == null,
   +          "Use forRowData() or forRow() to initialize the input DataStream only once.");
   +      Preconditions.checkArgument(rowDataInput != null || inputCreator != null,
   +          "Please use forRowData() or forRow() to initialize the input DataStream.");
   +      rowDataInput = rowDataInput == null ? inputCreator.apply(uidPrefix) : rowDataInput;
          Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
    
          if (table == null) {
   @@ -254,17 +266,6 @@ public class FlinkSink {
            }
          }
    
   -      // set name and uid for mappedRowDataInput if needed
   -      if (mappedRowDataInput != null) {
   -        if (uidPrefix != null) {
   -          rowDataInput = mappedRowDataInput
   -              .name(operatorName(uidPrefix))
   -              .uid(uidPrefix + "-mapper");
   -        } else {
   -          rowDataInput = mappedRowDataInput;
   -        }
   -      }
   -
          // Convert the requested flink table schema to flink row type.
          RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679394680



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }
 
+    private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+      final IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
       SingleOutputStreamOperator<Void> committerStream = writerStream
-          .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+          .transform(getOperatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
       if (uidPrefix != null) {
         committerStream = committerStream.uid(uidPrefix + "-committer");
       }
+      return committerStream;
+    }
 
-      DataStreamSink<RowData> resultStream = committerStream
-          .addSink(new DiscardingSink())
-          .name(String.format("IcebergSink %s", table.name()))
-          .setParallelism(1);
+    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {

Review comment:
       after re-read the code, I saw your point now. will fix the `input` references




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682548420



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       I tried to initialize the input stream lazily as the following because I think it could make the source `rowDataInput` stream very clear, it's the only one stream that we will need to chain the following operator.
   
   ```diff
   diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   index bda6667cd..a6be26b31 100644
   --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
   @@ -24,6 +24,7 @@ import java.io.UncheckedIOException;
    import java.util.List;
    import java.util.Locale;
    import java.util.Map;
   +import java.util.function.Function;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
   @@ -117,7 +118,7 @@ public class FlinkSink {
    
      public static class Builder {
        private DataStream<RowData> rowDataInput = null;
   -    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;
   +    private Function<String, DataStream<RowData>> inputCreator = null;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
   @@ -136,9 +137,17 @@ public class FlinkSink {
        }
    
        private <T> Builder forMapperOutputType(DataStream<T> input,
   -                                        MapFunction<T, RowData> mapper,
   -                                        TypeInformation<RowData> outputType) {
   -      this.mappedRowDataInput = input.map(mapper, outputType);
   +                                            MapFunction<T, RowData> mapper,
   +                                            TypeInformation<RowData> outputType) {
   +      this.inputCreator = newUidPrefix -> {
   +        if (newUidPrefix != null) {
   +          return input.map(mapper, outputType)
   +              .name(operatorName(newUidPrefix))
   +              .uid(newUidPrefix + "-mapper");
   +        } else {
   +          return input.map(mapper, outputType);
   +        }
   +      };
          return this;
        }
    
   @@ -241,8 +250,11 @@ public class FlinkSink {
        }
    
        public DataStreamSink<RowData> build() {
   -      Preconditions.checkArgument(rowDataInput != null || mappedRowDataInput != null,
   -          "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
   +      Preconditions.checkArgument(rowDataInput == null || inputCreator == null,
   +          "Use forRowData() or forRow() to initialize the input DataStream only once.");
   +      Preconditions.checkArgument(rowDataInput != null || inputCreator != null,
   +          "Please use forRowData() or forRow() to initialize the input DataStream.");
   +      rowDataInput = rowDataInput == null ? inputCreator.apply(uidPrefix) : rowDataInput;
          Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
    
          if (table == null) {
   @@ -254,17 +266,6 @@ public class FlinkSink {
            }
          }
    
   -      // set name and uid for mappedRowDataInput if needed
   -      if (mappedRowDataInput != null) {
   -        if (uidPrefix != null) {
   -          rowDataInput = mappedRowDataInput
   -              .name(operatorName(uidPrefix))
   -              .uid(uidPrefix + "-mapper");
   -        } else {
   -          rowDataInput = mappedRowDataInput;
   -        }
   -      }
   -
          // Convert the requested flink table schema to flink row type.
          RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       Yeah,  this is needed.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws Exception {
       Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
     }
   }
+
+  @Test
+  public void testTwoSinksInDisjointedDAG() throws Exception {
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    final String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");

Review comment:
       I think we could follow the rule though it seems like an implicit code style rule now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679109390



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {

Review comment:
       In iceberg, we usually remove the `get` prefix from the getter method...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679363565



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }

Review comment:
       You are right here. But the problem is the build method return type.
   ```
   public DataStreamSink<RowData> build()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679363565



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }

Review comment:
       You are right here. I see the problem is the build method
   ```
   public DataStreamSink<RowData> build()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#issuecomment-893181774


   @openinx please take another look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682999785



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws Exception {
       Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
     }
   }
+
+  @Test
+  public void testTwoSinksInDisjointedDAG() throws Exception {
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    final String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");

Review comment:
       done. I removed all the final modifiers for local vars in this PR

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       great idea. switched to this lazy ini mode

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) {
       return this;
     }
 
+    private <T> Builder forMapperOutputType(DataStream<T> input,
+                                        MapFunction<T, RowData> mapper,

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679753863



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }

Review comment:
       I think the `build()` will need to be changed to return a `DataStreamSink<Void>` because the stream won't actually emit any real `RowData`.  But as it's a public API,  will this break the upstream flink datastream job?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#issuecomment-888779550


   Thanks @stevenzwu for the PR,   Let me take a look...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r680088261



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input) {
 
   public static class Builder {
     private DataStream<RowData> rowDataInput = null;
+    private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;

Review comment:
       Note that `mappedRowDataInput` is intentionally defined as `SingleOutputStreamOperator` (not `DataStream `), because `DataStream` doesn't expose `uid` and `name` method and hence we can't lazily add them to the `Map` operator with `DataStream<RowData> rowDataInput`.
   
   I agree that this is a little confusing and less than ideal. Another option is to change Builder to a generic class like `Builder<T>`. but I don't like that either and hence used the current implementation. I am very open to other alternatives.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679747510



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +254,83 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
+      // set name and uid for mappedRowDataInput if needed
+      if (mappedRowDataInput != null) {
+        if (uidPrefix != null) {
+          rowDataInput = mappedRowDataInput
+              .name(operatorName(uidPrefix))
+              .uid(uidPrefix + "-mapper");
+        } else {
+          rowDataInput = mappedRowDataInput;
         }
       }
 
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
-      rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+      DataStream<RowData> distributeStream = distributeDataStream(
+          rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);

Review comment:
       Should we also add the uid & name if we've already chained the `PartitionKeySelector` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679108436



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }

Review comment:
       Seems like the correct code should be the following, so that we could get ride of the IDE warning:
   
   ```java
       private DataStreamSink<Void> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
         DataStreamSink<Void> resultStream = committerStream
             .addSink(new DiscardingSink<>())
             .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
             .setParallelism(1);
         if (uidPrefix != null) {
           resultStream = resultStream.uid(uidPrefix + "-dummysink");
         }
         return resultStream;
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r679365088



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -246,52 +248,71 @@ public Builder uidPrefix(String newPrefix) {
         }
       }
 
-      // Find out the equality field id list based on the user-provided equality field column names.
-      List<Integer> equalityFieldIds = Lists.newArrayList();
-      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-        for (String column : equalityFieldColumns) {
-          org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-          Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-              column, table.schema());
-          equalityFieldIds.add(field.fieldId());
-        }
-      }
-
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
       // Distribute the records from input data stream based on the write.distribution-mode.
       rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
 
-      // Chain the iceberg stream writer and committer operator.
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
-      IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+      // Add parallel writers that append rows to files
+      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(rowDataInput, flinkRowType);
 
-      this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
+      // Add single-parallelism committer that commits files
+      // after successful checkpoint or end of input
+      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
 
-      SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+      // Add dummy discard sink
+      return appendDummySink(committerStream);
+    }
+
+    private String getOperatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+
+    private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+      DataStreamSink<RowData> resultStream = committerStream
+          .addSink(new DiscardingSink())
+          .name(getOperatorName(String.format("IcebergSink %s", this.table.name())))
+          .setParallelism(1);
       if (uidPrefix != null) {
-        writerStream = writerStream.uid(uidPrefix + "-writer");
+        resultStream = resultStream.uid(uidPrefix + "-dummysink");
       }
+      return resultStream;
+    }
 
+    private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+      final IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
       SingleOutputStreamOperator<Void> committerStream = writerStream
-          .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+          .transform(getOperatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
           .setParallelism(1)
           .setMaxParallelism(1);
       if (uidPrefix != null) {
         committerStream = committerStream.uid(uidPrefix + "-committer");
       }
+      return committerStream;
+    }
 
-      DataStreamSink<RowData> resultStream = committerStream
-          .addSink(new DiscardingSink())
-          .name(String.format("IcebergSink %s", table.name()))
-          .setParallelism(1);
+    private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {

Review comment:
       we are still reference to the stream after `distributeDataStream`. still using the old style of mutable `rowDataInput`. Hence unit tests run fine. 
   
   I definitely see that it is easier to get confused. let me update the code to make it more consistent and clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2886: Flink: if provided, add uidPrefix to operator name so that Flink web …

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682549277



##########
File path: flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws Exception {
       Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
     }
   }
+
+  @Test
+  public void testTwoSinksInDisjointedDAG() throws Exception {
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    final String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");

Review comment:
       I think we could follow the rule though it seems like an implicit code style rule now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org