You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/15 18:50:31 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #16215: [BEAM-13402] Simplify PubsubLiteSink

TheNeuralBit commented on a change in pull request #16215:
URL: https://github.com/apache/beam/pull/16215#discussion_r769880465



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
##########
@@ -27,9 +27,13 @@
 import java.util.HashMap;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A map of working publishers by PublisherOptions. */
 class PublisherCache implements AutoCloseable {
+  private final Logger logger = LoggerFactory.getLogger(PublisherCache.class);

Review comment:
       nit: we usually make loggers static and call them LOG:
   
   ```suggestion
     private static final Logger LOG = LoggerFactory.getLogger(PublisherCache.class);
   ```
   
   ```
   ❯ grep -iIrn LoggerFactory\.getLogger sdks/java | grep private | cut -d' ' -f2- |  cut -d = -f-1 | awk '{$1=$1};1' | sort | uniq -c | sort -n
         1 private final Logger log
         1 private static final org.slf4j.Logger LOG
       263 private static final Logger LOG
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
##########
@@ -46,98 +41,44 @@
   private final PublisherOptions options;
 
   @GuardedBy("this")
-  private transient PublisherOrError publisherOrError;
-
-  // Whenever outstanding is decremented, notify() must be called.
-  @GuardedBy("this")
-  private transient int outstanding;
-
-  @GuardedBy("this")
-  private transient Deque<CheckedApiException> errorsSinceLastFinish;
+  private transient RunState runState;
 
   public PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
 
-  @Setup
-  public void setup() throws ApiException {
-    Publisher<MessageMetadata> publisher;
-    publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
-    synchronized (this) {
-      outstanding = 0;
-      errorsSinceLastFinish = new ArrayDeque<>();
-      publisherOrError = PublisherOrError.ofPublisher(publisher);
+  private static class RunState {
+    private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();
+
+    private final Publisher<MessageMetadata> publisher;
+
+    RunState(PublisherOptions options) {
+      publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+    }
+
+    void publish(PubSubMessage message) {
+      futures.add(publisher.publish(Message.fromProto(message)));
+    }
+
+    void waitForDone() throws Exception {
+      ApiFutures.allAsList(futures).get(1, MINUTES);
     }
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
-          }
-        };
-    publisher.addListener(
-        new Listener() {
-          @Override
-          public void failed(State s, Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
   }
 
-  private synchronized void decrementOutstanding() {
-    --outstanding;
-    notify();
+  @StartBundle
+  public synchronized void startBundle() throws ApiException {
+    runState = new RunState(options);
   }
 
   @ProcessElement
   public synchronized void processElement(@Element PubSubMessage message)
       throws CheckedApiException {
-    ++outstanding;
-    if (publisherOrError.getKind() == Kind.ERROR) {
-      throw publisherOrError.error();
-    }
-    ApiFuture<MessageMetadata> future =
-        publisherOrError.publisher().publish(Message.fromProto(message));
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            decrementOutstanding();
-            errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
-          }
-        };
-    ApiFutures.addCallback(
-        future,
-        new ApiFutureCallback<MessageMetadata>() {
-          @Override
-          public void onSuccess(MessageMetadata messageMetadata) {
-            decrementOutstanding();
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
+    runState.publish(message);
   }
 
   // Intentionally don't flush on bundle finish to allow multi-sink client reuse.

Review comment:
       Is this comment accurate? It looks like both the old and new implementation are flushing on bundle finish

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
##########
@@ -207,32 +189,4 @@ public void exceptionMixedWithOK() throws Exception {
     assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
     exec.shutdownNow();
   }
-
-  @Test
-  public void listenerExceptionOnBundleFinish() throws Exception {
-    Message message1 = Message.builder().build();
-    SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
-
-    SettableApiFuture<Void> publishFuture = SettableApiFuture.create();
-    when(publisher.publish(message1))
-        .thenAnswer(
-            args -> {
-              publishFuture.set(null);
-              return future;
-            });
-    Future<?> executorFuture =
-        Executors.newSingleThreadExecutor()
-            .submit(
-                () -> {
-                  PipelineExecutionException e =
-                      assertThrows(PipelineExecutionException.class, () -> runWith(message1));
-                  Optional<CheckedApiException> statusOr = ExtractStatus.extract(e.getCause());
-                  assertTrue(statusOr.isPresent());
-                  assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
-                });
-    publishFuture.get();
-    listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying);
-    future.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
-    executorFuture.get();
-  }

Review comment:
       Is this testing a case that we need to be concerned about? Why don't we need the Publisher.addListener logic in PubsubLiteSink anymore?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
##########
@@ -46,98 +41,44 @@
   private final PublisherOptions options;
 
   @GuardedBy("this")
-  private transient PublisherOrError publisherOrError;
-
-  // Whenever outstanding is decremented, notify() must be called.
-  @GuardedBy("this")
-  private transient int outstanding;
-
-  @GuardedBy("this")
-  private transient Deque<CheckedApiException> errorsSinceLastFinish;
+  private transient RunState runState;
 
   public PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
 
-  @Setup
-  public void setup() throws ApiException {
-    Publisher<MessageMetadata> publisher;
-    publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
-    synchronized (this) {
-      outstanding = 0;
-      errorsSinceLastFinish = new ArrayDeque<>();
-      publisherOrError = PublisherOrError.ofPublisher(publisher);
+  private static class RunState {
+    private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();
+
+    private final Publisher<MessageMetadata> publisher;
+
+    RunState(PublisherOptions options) {
+      publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+    }
+
+    void publish(PubSubMessage message) {
+      futures.add(publisher.publish(Message.fromProto(message)));
+    }
+
+    void waitForDone() throws Exception {

Review comment:
       You might consider adding some logging or wrapping the exceptions thrown here with more helpful messages, rather than bubbling up all exceptions, e.g. "timeout waiting for PubsubLite messages to be published".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org