You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/20 16:40:12 UTC

[GitHub] [flink] fapaul commented on a change in pull request #18412: [FLINK-25696][datastream] Introduce MetadataPublisher interface to SinkWriter

fapaul commented on a change in pull request #18412:
URL: https://github.com/apache/flink/pull/18412#discussion_r788946296



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -65,7 +67,9 @@
  *
  * @param <IN> The type of the input elements.
  */
-class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> {
+class KafkaWriter<IN>
+        implements SinkWriter<IN, KafkaCommittable, KafkaWriterState>,
+                MetadataPublisher<RecordMetadata> {

Review comment:
       Actually, my initial idea was to implement the `MetadataPublisher` with the Sink and only pass the list of subscribers to the writer.
   
   I am not sure about the final design of the table store but my assumption was that there are only new subscribers during the writer creation and not during runtime. So exposing adding new subscribers through the writer is unnecessary.
   Does that make sense? 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -391,6 +401,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
                         () -> throwException(metadata, exception, producer),
                         "Failed to send data to Kafka");
             }
+
+            for (Consumer<RecordMetadata> metadataConsumer : metadataConsumers) {
+                metadataConsumer.accept(metadata);

Review comment:
       Is this really safe? Currently one of the Kafka producer threads will update the metadata.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -177,6 +178,25 @@ public void testCurrentSendTimeMetric() throws Exception {
         }
     }
 
+    @Test
+    public void testMetadataPublisher() throws Exception {
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        getKafkaClientConfiguration(),
+                        DeliveryGuarantee.AT_LEAST_ONCE,
+                        InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()))) {
+            List<String> metadataList = new ArrayList<>();
+            writer.subscribe(meta -> metadataList.add(meta.toString()));
+            List<String> expected = new ArrayList<>();
+            for (int i = 0; i < 100; i++) {
+                writer.write(1, SINK_WRITER_CONTEXT);
+                expected.add("testMetadataPublisher-0@" + i);
+            }
+            writer.prepareCommit(false);
+            assertThat(metadataList, equalTo(expected));

Review comment:
       Nit: please use assertJ for new assertions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org