You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/26 14:02:35 UTC

[GitHub] [beam] johnjcasey opened a new pull request, #22450: 21730 fix offset resetting

johnjcasey opened a new pull request, #22450:
URL: https://github.com/apache/beam/pull/22450

   Remove code that generated a random group name
   Added an integration test that verifies we don't re-process elements
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r930545422


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   Also confirm that expected contains the element ?
   Otherwise this would pass trivially for a run that does not read any data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22450:
URL: https://github.com/apache/beam/pull/22450#issuecomment-1195840287

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22450:
URL: https://github.com/apache/beam/pull/22450#issuecomment-1195950545

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22450:
URL: https://github.com/apache/beam/pull/22450#issuecomment-1195604023

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kileys for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22450:
URL: https://github.com/apache/beam/pull/22450#issuecomment-1195527584

   @chamikaramj I figured out a way to test streaming. Its not super elegant, but it works for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #22450:
URL: https://github.com/apache/beam/pull/22450#issuecomment-1195936362

   r: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r930547619


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   Actually we might need to write output to a sink (for e.g. a file) and read that back after the pipeline to verify that this was a not a trivial execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r931071851


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   I do that via the log and expected logs. Its essentially a workaround sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r931168600


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   Ah ok. Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #22450: 21730 fix offset resetting

Posted by GitBox <gi...@apache.org>.
chamikaramj merged PR #22450:
URL: https://github.com/apache/beam/pull/22450


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org