You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/11 06:16:56 UTC

[GitHub] [flink] mas-chen commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

mas-chen commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r991847890


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
                 TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+        GenericRowData.ofKind(

Review Comment:
   +1, I believe you can reuse `TEST_DATA` already defined in the class. `TEST_DATA` also has many records, but that's a different issue



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -382,4 +543,33 @@ public RowData next() {
             }
         }
     }
+
+    private class ReusableIteratorWithNullTimestamp implements Iterator<RowData> {

Review Comment:
   I think the iterator is overkill here. You can even test this functionality with one record. Basically you want to confirm if a record with a null timestamp can be flushed.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -297,6 +432,26 @@ public Long timestamp() {
         }
     }
 
+    private void writeDataWithNullTimestamp(
+            ReducingUpsertWriter<?> writer, Iterator<RowData> iterator) throws Exception {
+        while (iterator.hasNext()) {
+            RowData next = iterator.next();
+            writer.write(

Review Comment:
   You can just call this once and invoke flush() afterwards. There's even no need to test the buffering logic as that is captured by other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org