You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/22 08:46:28 UTC

[GitHub] [flink] paul8263 opened a new pull request, #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

paul8263 opened a new pull request, #20343:
URL: https://github.com/apache/flink/pull/20343

   … results in Null Pointer Exception
   
   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Allow flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.WrappedContext to set timestamp with NULL value in order to avoid NPE.
   
   
   ## Brief change log
   
     - flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
     - flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as *flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest::testWriteDataWithNullTimestamp()*.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "paul8263 (via GitHub)" <gi...@apache.org>.
paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1425254629

   Hi @jaumebecks,
   Thanks for reviewing the PR. I'd like to do the migration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1369346830

   Hi @MartijnVisser and @jaumebecks ,
   
   I updated the unit test and I am currently waiting for the CI result.
   
   If there are other issues that need any changes, please let me know. Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "jaumebecks (via GitHub)" <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1486296425

   > > Shall we close this PR and move the conversation to [apache/flink-connector-kafka#5](https://github.com/apache/flink-connector-kafka/pull/5)?
   > 
   > yes please. We're removing the Kafka connector code from `apache/flink:main` now.
   
   Can anyone please do this? I see you still push commits @paul8263, can you please move them to [apache/flink-connector-kafka#5](https://github.com/apache/flink-connector-kafka/pull/5)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r988711185


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));

Review Comment:
   Is it necessary to split this into 2 calls? What do we gain?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
                 TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+        GenericRowData.ofKind(

Review Comment:
   I don't see why we need so many records. This should already fail even with a single record.
   Reducing the number of required required records for the test would remove a lot of noise from the test.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                null)));
+        expected.put(
+                1002,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1002,
+                                StringData.fromString("More Java for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                22.22,
+                                22,
+                                null)));
+        expected.put(
+                1004,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1004,
+                                StringData.fromString("A Teaspoon of Java"),
+                                StringData.fromString("Kevin Jones"),
+                                55.55,
+                                55,
+                                null)));
+
+        expected.put(
+                1005,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                DELETE,
+                                1005,
+                                StringData.fromString("A Teaspoon of Java 1.8"),
+                                StringData.fromString("Kevin Jones"),
+                                null,
+                                1010,
+                                null)));
+
+        compareCompactedResult(expected, writer.rowDataCollectors);
+
+        writer.rowDataCollectors.clear();
+        // write remaining data, and they are still buffered
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(4, 3));
+        assertThat(writer.rowDataCollectors).isEmpty();

Review Comment:
   Why do we keep testing the buffering?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();

Review Comment:
   ```suggestion
           final Map<Integer, List<RowData>> expected = new HashMap<>();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size

Review Comment:
   Why doesn't this trigger batching? Are we relying on some default config value here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1420875259

   @jaumebecks It should be moved; Flink 1.17 will be the latest release that ships with a Kafka Connector, all new features should go into the externalized repo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "jaumebecks (via GitHub)" <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1420859875

   Hi @MartijnVisser, now that https://github.com/apache/flink-connector-kafka/pull/1 is merged, what should we do with this PR, shall we move it there, or is it okay to maintain it? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "jaumebecks (via GitHub)" <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1420879825

   Thanks @MartijnVisser! @paul8263 wdyt? Will you take over this migration into the new repo, otherwise we can try it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] paul8263 commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "paul8263 (via GitHub)" <gi...@apache.org>.
paul8263 commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r1089652422


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -261,6 +261,52 @@ public void testFlushDataWhenCheckpointing() throws Exception {
         compareCompactedResult(expected, writer.rowDataCollectors);
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        bufferedWriter.write(
+                GenericRowData.ofKind(
+                        INSERT,
+                        1001,
+                        StringData.fromString("Java public for dummies"),
+                        StringData.fromString("Tan Ah Teck"),
+                        11.11,
+                        11,
+                        null),
+                new org.apache.flink.api.connector.sink2.SinkWriter.Context() {
+                    @Override
+                    public long currentWatermark() {
+                        throw new UnsupportedOperationException("Not implemented.");
+                    }
+
+                    @Override
+                    public Long timestamp() {
+                        return null;
+                    }
+                });
+
+        bufferedWriter.flush(true);
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();

Review Comment:
   Thanks @zentol . I updated the code and please help review it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer… [flink]

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser closed pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…
URL: https://github.com/apache/flink/pull/20343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mhv666 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
mhv666 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1246679458

   @paul8263 I'm currently facing this issue, do you know when it will be merge it? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1265807076

   @PatrickRen WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1351540640

   @jaumebecks Yes, but let's not create that right now. We're in the process of externalizing Kafka (see https://github.com/apache/flink-connector-kafka/pull/1) and when that's done, all PRs should be created in that repo, not in the Flink main repo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer… [flink]

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1758313688

   @paul8263 The Flink Kafka connector resides in it's own repository, if this code change is still relevant, please open the PR in https://github.com/apache/flink-connector-kafka


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r1073590689


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -261,6 +261,52 @@ public void testFlushDataWhenCheckpointing() throws Exception {
         compareCompactedResult(expected, writer.rowDataCollectors);
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        bufferedWriter.write(
+                GenericRowData.ofKind(
+                        INSERT,
+                        1001,
+                        StringData.fromString("Java public for dummies"),
+                        StringData.fromString("Tan Ah Teck"),
+                        11.11,
+                        11,
+                        null),
+                new org.apache.flink.api.connector.sink2.SinkWriter.Context() {
+                    @Override
+                    public long currentWatermark() {
+                        throw new UnsupportedOperationException("Not implemented.");
+                    }
+
+                    @Override
+                    public Long timestamp() {
+                        return null;
+                    }
+                });
+
+        bufferedWriter.flush(true);
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();

Review Comment:
   ```suggestion
           final Map<Integer, List<RowData>> expected = new HashMap<>();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -261,6 +261,52 @@ public void testFlushDataWhenCheckpointing() throws Exception {
         compareCompactedResult(expected, writer.rowDataCollectors);
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer);
+
+        bufferedWriter.write(
+                GenericRowData.ofKind(
+                        INSERT,
+                        1001,
+                        StringData.fromString("Java public for dummies"),
+                        StringData.fromString("Tan Ah Teck"),
+                        11.11,
+                        11,
+                        null),
+                new org.apache.flink.api.connector.sink2.SinkWriter.Context() {
+                    @Override
+                    public long currentWatermark() {
+                        throw new UnsupportedOperationException("Not implemented.");
+                    }
+
+                    @Override
+                    public Long timestamp() {
+                        return null;
+                    }
+                });
+
+        bufferedWriter.flush(true);
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                null)));
+
+        compareCompactedResult(expected, writer.rowDataCollectors);
+
+        writer.rowDataCollectors.clear();

Review Comment:
   This seems unnecessary. The entire writer will be garbage collected anyway after the test, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1351525105

   @MartijnVisser it seems @paul8263 is not responding to the requested changes, we may create a similar PR in the following days for fixing this, does that sound good to you?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r991847890


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
                 TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+        GenericRowData.ofKind(

Review Comment:
   +1, I believe you can reuse `TEST_DATA` already defined in the class. `TEST_DATA` also has many records, but that's a different issue



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -382,4 +543,33 @@ public RowData next() {
             }
         }
     }
+
+    private class ReusableIteratorWithNullTimestamp implements Iterator<RowData> {

Review Comment:
   I think the iterator is overkill here. You can even test this functionality with one record. Basically you want to confirm if a record with a null timestamp can be flushed.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -297,6 +432,26 @@ public Long timestamp() {
         }
     }
 
+    private void writeDataWithNullTimestamp(
+            ReducingUpsertWriter<?> writer, Iterator<RowData> iterator) throws Exception {
+        while (iterator.hasNext()) {
+            RowData next = iterator.next();
+            writer.write(

Review Comment:
   You can just call this once and invoke flush() afterwards. There's even no need to test the buffering logic as that is captured by other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "paul8263 (via GitHub)" <gi...@apache.org>.
paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1487976782

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1227296392

   Hi, any plans on merging this? :smile:  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] paul8263 commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
paul8263 commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1369309368

   > @MartijnVisser it seems @paul8263 is not responding to the requested changes, we may create a similar PR in the following days for fixing this, does that sound good to you?
   
   Hi @jaumebecks ,
   Sorry for the late response. I will do it these days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1192341792

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "757251c9004315928c8dc5bdb31f78763a33af6d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "757251c9004315928c8dc5bdb31f78763a33af6d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 757251c9004315928c8dc5bdb31f78763a33af6d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jaumebecks commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "jaumebecks (via GitHub)" <gi...@apache.org>.
jaumebecks commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1484710192

   Shall we close this PR and move the conversation to https://github.com/apache/flink-connector-kafka/pull/5?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tzulitai commented on pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #20343:
URL: https://github.com/apache/flink/pull/20343#issuecomment-1485343974

   > Shall we close this PR and move the conversation to https://github.com/apache/flink-connector-kafka/pull/5?
   
   yes please. We're removing the Kafka connector code from `apache/flink:main` now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org