You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/23 22:38:15 UTC

[GitHub] [beam] dpcollins-google commented on a change in pull request #16215: [BEAM-13402] Simplify PubsubLiteSink

dpcollins-google commented on a change in pull request #16215:
URL: https://github.com/apache/beam/pull/16215#discussion_r774800962



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
##########
@@ -46,98 +41,44 @@
   private final PublisherOptions options;
 
   @GuardedBy("this")
-  private transient PublisherOrError publisherOrError;
-
-  // Whenever outstanding is decremented, notify() must be called.
-  @GuardedBy("this")
-  private transient int outstanding;
-
-  @GuardedBy("this")
-  private transient Deque<CheckedApiException> errorsSinceLastFinish;
+  private transient RunState runState;
 
   public PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
 
-  @Setup
-  public void setup() throws ApiException {
-    Publisher<MessageMetadata> publisher;
-    publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
-    synchronized (this) {
-      outstanding = 0;
-      errorsSinceLastFinish = new ArrayDeque<>();
-      publisherOrError = PublisherOrError.ofPublisher(publisher);
+  private static class RunState {
+    private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();
+
+    private final Publisher<MessageMetadata> publisher;
+
+    RunState(PublisherOptions options) {
+      publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+    }
+
+    void publish(PubSubMessage message) {
+      futures.add(publisher.publish(Message.fromProto(message)));
+    }
+
+    void waitForDone() throws Exception {
+      ApiFutures.allAsList(futures).get(1, MINUTES);
     }
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
-          }
-        };
-    publisher.addListener(
-        new Listener() {
-          @Override
-          public void failed(State s, Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
   }
 
-  private synchronized void decrementOutstanding() {
-    --outstanding;
-    notify();
+  @StartBundle
+  public synchronized void startBundle() throws ApiException {
+    runState = new RunState(options);
   }
 
   @ProcessElement
   public synchronized void processElement(@Element PubSubMessage message)
       throws CheckedApiException {
-    ++outstanding;
-    if (publisherOrError.getKind() == Kind.ERROR) {
-      throw publisherOrError.error();
-    }
-    ApiFuture<MessageMetadata> future =
-        publisherOrError.publisher().publish(Message.fromProto(message));
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            decrementOutstanding();
-            errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
-          }
-        };
-    ApiFutures.addCallback(
-        future,
-        new ApiFutureCallback<MessageMetadata>() {
-          @Override
-          public void onSuccess(MessageMetadata messageMetadata) {
-            decrementOutstanding();
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
+    runState.publish(message);
   }
 
   // Intentionally don't flush on bundle finish to allow multi-sink client reuse.

Review comment:
       Neither is explicitly calling flush() on the client since that would also block on the completion of other SDF instances' messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org