You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/06 08:33:47 UTC

[GitHub] [flink] zentol commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

zentol commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r988711185


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));

Review Comment:
   Is it necessary to split this into 2 calls? What do we gain?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
                 TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+        GenericRowData.ofKind(

Review Comment:
   I don't see why we need so many records. This should already fail even with a single record.
   Reducing the number of required required records for the test would remove a lot of noise from the test.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                null)));
+        expected.put(
+                1002,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1002,
+                                StringData.fromString("More Java for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                22.22,
+                                22,
+                                null)));
+        expected.put(
+                1004,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1004,
+                                StringData.fromString("A Teaspoon of Java"),
+                                StringData.fromString("Kevin Jones"),
+                                55.55,
+                                55,
+                                null)));
+
+        expected.put(
+                1005,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                DELETE,
+                                1005,
+                                StringData.fromString("A Teaspoon of Java 1.8"),
+                                StringData.fromString("Kevin Jones"),
+                                null,
+                                1010,
+                                null)));
+
+        compareCompactedResult(expected, writer.rowDataCollectors);
+
+        writer.rowDataCollectors.clear();
+        // write remaining data, and they are still buffered
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(4, 3));
+        assertThat(writer.rowDataCollectors).isEmpty();

Review Comment:
   Why do we keep testing the buffering?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();

Review Comment:
   ```suggestion
           final Map<Integer, List<RowData>> expected = new HashMap<>();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size

Review Comment:
   Why doesn't this trigger batching? Are we relying on some default config value here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org