You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/31 03:07:25 UTC

[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connector/Kafka] Skip null records when writing

leonardBang commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1008991930


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -194,8 +194,10 @@
     public void write(IN element, Context context) throws IOException {
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());

Review Comment:
   ```suggestion
       public void write(@Nullable IN element, Context context) throws IOException {
           final ProducerRecord<byte[], byte[]> record =
                   recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
   ```



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java:
##########
@@ -54,7 +54,7 @@ default void open(
      * @param element element to be serialized
      * @param context context to possibly determine target partition
      * @param timestamp timestamp
-     * @return Kafka {@link ProducerRecord}
+     * @return Kafka {@link ProducerRecord} (null if the element cannot be serialized)
      */
     ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp);

Review Comment:
   ```suggestion
        * @return Kafka {@link ProducerRecord}  or null if the given element cannot be serialized
        */
       @Nullable ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception {
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
             assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+            // elements that cannot be serialized should be silently skipped

Review Comment:
   minor: we can improve the note, because not all serializers will return `null` when the element cannot be serialized, not all `null` are due to cannot be serialized . 
   What we can ensure is only that the `KafkaSinkWriter` will silently skip `null` that returned by serializers



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws Exception {
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
             assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+            // elements that cannot be serialized should be silently skipped
+            writer.write(null, SINK_WRITER_CONTEXT);
+            timeService.trigger();
+            assertThat(numBytesOut.getCount()).isEqualTo(0L);
+            assertThat(numRecordsOut.getCount()).isEqualTo(0);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+            assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+            // but properly serialized elements should count just normally

Review Comment:
   Could we add a new test like `testWriteNullElement` instead of modification in an existed test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org