You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/25 00:14:49 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #14069: [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer

boyuanzz commented on a change in pull request #14069:
URL: https://github.com/apache/beam/pull/14069#discussion_r582398133



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
##########
@@ -81,15 +84,22 @@ public ProcessContinuation processElement(
           .lastClaimed()
           .ifPresent(
               lastClaimedOffset ->
-                  finalizer.afterBundleCommit(
-                      Instant.ofEpochMilli(Long.MAX_VALUE),
-                      () -> {
-                        Committer committer = committerFactory.apply(subscriptionPartition);
-                        committer.startAsync().awaitRunning();
-                        // Commit the next-to-deliver offset.
-                        committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                        committer.stopAsync().awaitTerminated();
-                      }));
+              /* TODO(boyuanzz): When default dataflow can use finalizers, undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
+                Committer committer = committerFactory.apply(subscriptionPartition);
+                committer.startAsync().awaitRunning();
+                // Commit the next-to-deliver offset.
+                try {
+                  committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();

Review comment:
       There are several concerns around committing offset in the `DoFn`:
   
   - It's possible that Dataflow fails to process one element and re-process it again. In this case, one offset might be committed for more than once.
   - It's also possible that one offset is committed successfully but the pipeline crashes before downstream actually  consumes the output. That might result in data loss.
   
   If these concerns are not true for your source, then that's ok to do so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org