You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tzulitai (via GitHub)" <gi...@apache.org> on 2023/04/05 22:16:24 UTC

[GitHub] [flink-connector-kafka] tzulitai opened a new pull request, #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

tzulitai opened a new pull request, #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22

   This is a "follow-up" to previous work in https://github.com/apache/flink/pull/21808 that added boundedness options for the SQL Kafka Connector. This PR does the same for the SQL Upsert Kafka Connector.
   
   Please see new tests in `UpsertKafkaTableITCase` for end-to-end usage examples of the new boundedness options and the expected behaviour.
   
   Most of the added tests references what was added for the SQL Kafka Connector in https://github.com/apache/flink/pull/21808.
   
   ## Changelog
   
   1. Expose `scan.bounded.*` options for upsert Kafka
   2. Add unit tests for source configuration, as well as end-to-end integration tests with boundedness enabled
   3. Minor refactorings (tagged `[hotfix]` commits) to share test utilities across the tests for SQL Kafka Connector and SQL Upsert Kafka Connector.
   
   ## Testing
   
   Covered in new tests in `UpsertKafkaDynamicTableFactoryTest` and `UpsertKafkaTableITCase`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#issuecomment-1498264961

   cc @twalthr @dawidwys @Gerrrr for review, thank you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#issuecomment-1499280862

   Thank you for the review @Gerrrr @dawidwys! Merging this ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159161297


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -145,7 +145,7 @@ public static void validateSinkTopic(ReadableConfig tableOptions) {
         }
     }
 
-    private static void validateScanStartupMode(ReadableConfig tableOptions) {

Review Comment:
   Changed this by mistake, will revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159113822


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Note: with boundedness set using specific offsets, would there be semantical issues if the end offset happens to be between a `-U` and `+U`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   https://github.com/apache/flink-connector-kafka/pull/22/commits/61e919ecca80bd094bcc431c6f298faa52c6f5c0
   
   this new commit answers my confusion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159159130


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Ah, yes you're right. Another one of those mind-twists I'm still trying to wrap my head around 😅
   So the case I mentioned would never happen then - if the end offset is right after a `+I` that is an update, then the consumer side would normalize it as a `-U` and `+U`. I'll update the test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] Gerrrr commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "Gerrrr (via GitHub)" <gi...@apache.org>.
Gerrrr commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159122981


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -145,7 +145,7 @@ public static void validateSinkTopic(ReadableConfig tableOptions) {
         }
     }
 
-    private static void validateScanStartupMode(ReadableConfig tableOptions) {

Review Comment:
   nit: we can keep this `private` as this PR does not add the scan startup mode to upsert kafka.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -189,7 +189,7 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) {
                         });
     }
 
-    private static void validateScanBoundedMode(ReadableConfig tableOptions) {

Review Comment:
   nit: package-private is probably sufficient here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159159130


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Ah, yes you're right. Another one of those mind-twists I'm still trying to wrap my head around 😅
   So the case I mentioned would never happen then - if the end offset is right after a `+I` that happens to be an update, then the consumer side would normalize it as a `-U` and `+U`. I'll update the test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] Gerrrr commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "Gerrrr (via GitHub)" <gi...@apache.org>.
Gerrrr commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159155943


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Doesn't upsert kafka alway insert `I` and `D` and derive `-U` and `+U` on the consumer side via a normalization node?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        final String topic = "bounded_upsert_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   61e919ecca80bd094bcc431c6f298faa52c6f5c0
   
   this new commit answers my confusion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] dawidwys commented on a diff in pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "dawidwys (via GitHub)" <gi...@apache.org>.
dawidwys commented on code in PR #22:
URL: https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159372779


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java:
##########
@@ -390,6 +399,131 @@ private SerializationSchema<RowData> createConfluentAvroSerSchema(
                 RowDataToAvroConverters.createConverter(rowType));
     }
 
+    // --------------------------------------------------------------------------------------------
+    // Bounded end-offset tests
+    // --------------------------------------------------------------------------------------------
+
+    @Test
+    public void testBoundedSpecificOffsetsValidate() {
+        final Map<String, String> options = getFullSourceOptions();
+        options.put(
+                KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(),
+                ScanBoundedMode.SPECIFIC_OFFSETS.toString());
+
+        assertThatThrownBy(() -> createTableSource(SOURCE_SCHEMA, options))
+                .isInstanceOf(ValidationException.class);

Review Comment:
   could we assert the message as well to make sure the validation error is for the expected reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai closed pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai closed pull request #22: [FLINK-31740] [upsert-kafka] Allows setting boundedness for SQL Upsert Kafka Connector
URL: https://github.com/apache/flink-connector-kafka/pull/22


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org