You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/12/28 21:19:21 UTC

[beam] branch master updated: [BEAM-13402] Simplify PubsubLiteSink

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dbbd09  [BEAM-13402] Simplify PubsubLiteSink
     new b33bebc  Merge pull request #16215 from dpcollins-google/publish-no-desync
7dbbd09 is described below

commit 7dbbd097585017c7a655afc041435123745086dc
Author: Daniel Collins <dp...@google.com>
AuthorDate: Sun Dec 12 23:54:02 2021 -0500

    [BEAM-13402] Simplify PubsubLiteSink
    
    This also makes it not afflicted by futures never terminating by avoiding indefinite waits
---
 .../io/gcp/pubsublite/internal/PublisherCache.java |   5 +
 .../gcp/pubsublite/internal/PublisherOrError.java  |  49 ----------
 .../io/gcp/pubsublite/internal/PubsubLiteSink.java | 107 +++++----------------
 .../pubsublite/internal/PubsubLiteSinkTest.java    |  46 ---------
 4 files changed, 29 insertions(+), 178 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
index 76b69b3..ac85ba9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
@@ -27,9 +27,13 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.HashMap;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A map of working publishers by PublisherOptions. */
 class PublisherCache implements AutoCloseable {
+  private final Logger logger = LoggerFactory.getLogger(PublisherCache.class);
+
   @GuardedBy("this")
   private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
       new HashMap<>();
@@ -49,6 +53,7 @@ class PublisherCache implements AutoCloseable {
         new Listener() {
           @Override
           public void failed(State s, Throwable t) {
+            logger.warn("Publisher failed.", t);
             evict(options);
           }
         },
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java
deleted file mode 100644
index 7eb1c66..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import com.google.auto.value.AutoOneOf;
-import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.Publisher;
-
-/** A helper representing either a Publisher or an error. */
-@AutoOneOf(PublisherOrError.Kind.class)
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-abstract class PublisherOrError {
-  enum Kind {
-    PUBLISHER,
-    ERROR
-  }
-
-  abstract Kind getKind();
-
-  abstract Publisher<MessageMetadata> publisher();
-
-  abstract CheckedApiException error();
-
-  static PublisherOrError ofPublisher(Publisher<MessageMetadata> p) {
-    return AutoOneOf_PublisherOrError.publisher(p);
-  }
-
-  static PublisherOrError ofError(CheckedApiException e) {
-    return AutoOneOf_PublisherOrError.error(e);
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
index 4b666d2..f370919 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
@@ -17,25 +17,20 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutureCallback;
 import com.google.api.core.ApiFutures;
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Message;
 import com.google.cloud.pubsublite.MessageMetadata;
 import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.cloud.pubsublite.proto.PubSubMessage;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.function.Consumer;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.PublisherOrError.Kind;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /** A sink which publishes messages to Pub/Sub Lite. */
@@ -46,98 +41,44 @@ public class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
   private final PublisherOptions options;
 
   @GuardedBy("this")
-  private transient PublisherOrError publisherOrError;
-
-  // Whenever outstanding is decremented, notify() must be called.
-  @GuardedBy("this")
-  private transient int outstanding;
-
-  @GuardedBy("this")
-  private transient Deque<CheckedApiException> errorsSinceLastFinish;
+  private transient RunState runState;
 
   public PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
 
-  @Setup
-  public void setup() throws ApiException {
-    Publisher<MessageMetadata> publisher;
-    publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
-    synchronized (this) {
-      outstanding = 0;
-      errorsSinceLastFinish = new ArrayDeque<>();
-      publisherOrError = PublisherOrError.ofPublisher(publisher);
+  private static class RunState {
+    private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();
+
+    private final Publisher<MessageMetadata> publisher;
+
+    RunState(PublisherOptions options) {
+      publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+    }
+
+    void publish(PubSubMessage message) {
+      futures.add(publisher.publish(Message.fromProto(message)));
+    }
+
+    void waitForDone() throws Exception {
+      ApiFutures.allAsList(futures).get(1, MINUTES);
     }
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
-          }
-        };
-    publisher.addListener(
-        new Listener() {
-          @Override
-          public void failed(State s, Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
   }
 
-  private synchronized void decrementOutstanding() {
-    --outstanding;
-    notify();
+  @StartBundle
+  public synchronized void startBundle() throws ApiException {
+    runState = new RunState(options);
   }
 
   @ProcessElement
   public synchronized void processElement(@Element PubSubMessage message)
       throws CheckedApiException {
-    ++outstanding;
-    if (publisherOrError.getKind() == Kind.ERROR) {
-      throw publisherOrError.error();
-    }
-    ApiFuture<MessageMetadata> future =
-        publisherOrError.publisher().publish(Message.fromProto(message));
-    // cannot declare in inner class since 'this' means something different.
-    Consumer<Throwable> onFailure =
-        t -> {
-          synchronized (this) {
-            decrementOutstanding();
-            errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
-          }
-        };
-    ApiFutures.addCallback(
-        future,
-        new ApiFutureCallback<MessageMetadata>() {
-          @Override
-          public void onSuccess(MessageMetadata messageMetadata) {
-            decrementOutstanding();
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            onFailure.accept(t);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
+    runState.publish(message);
   }
 
   // Intentionally don't flush on bundle finish to allow multi-sink client reuse.
   @FinishBundle
-  public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
-    while (outstanding > 0) {
-      wait();
-    }
-    if (!errorsSinceLastFinish.isEmpty()) {
-      CheckedApiException canonical = errorsSinceLastFinish.pop();
-      while (!errorsSinceLastFinish.isEmpty()) {
-        canonical.addSuppressed(errorsSinceLastFinish.pop());
-      }
-      throw canonical;
-    }
-    if (publisherOrError.getKind() == Kind.ERROR) {
-      throw publisherOrError.error();
-    }
+  public synchronized void finishBundle() throws Exception {
+    runState.waitForDone();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
index df638ff..5c4e731 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
@@ -23,14 +23,11 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.api.core.ApiFutures;
-import com.google.api.core.ApiService;
 import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.pubsublite.CloudRegion;
@@ -52,7 +49,6 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
@@ -68,7 +64,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 import org.mockito.Spy;
-import org.mockito.stubbing.Answer;
 
 @RunWith(JUnit4.class)
 public class PubsubLiteSinkTest {
@@ -92,9 +87,6 @@ public class PubsubLiteSinkTest {
 
   private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions());
 
-  // Initialized in setUp.
-  private ApiService.Listener listener;
-
   @Captor
   final ArgumentCaptor<Message> publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);
 
@@ -110,16 +102,6 @@ public class PubsubLiteSinkTest {
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), publisher);
-    doAnswer(
-            (Answer<Void>)
-                args -> {
-                  listener = args.getArgument(0);
-                  return null;
-                })
-        .when(publisher)
-        .addListener(any(), any());
-    sink.setup();
-    verify(publisher).addListener(any(), any());
   }
 
   @Test
@@ -207,32 +189,4 @@ public class PubsubLiteSinkTest {
     assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
     exec.shutdownNow();
   }
-
-  @Test
-  public void listenerExceptionOnBundleFinish() throws Exception {
-    Message message1 = Message.builder().build();
-    SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
-
-    SettableApiFuture<Void> publishFuture = SettableApiFuture.create();
-    when(publisher.publish(message1))
-        .thenAnswer(
-            args -> {
-              publishFuture.set(null);
-              return future;
-            });
-    Future<?> executorFuture =
-        Executors.newSingleThreadExecutor()
-            .submit(
-                () -> {
-                  PipelineExecutionException e =
-                      assertThrows(PipelineExecutionException.class, () -> runWith(message1));
-                  Optional<CheckedApiException> statusOr = ExtractStatus.extract(e.getCause());
-                  assertTrue(statusOr.isPresent());
-                  assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
-                });
-    publishFuture.get();
-    listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying);
-    future.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
-    executorFuture.get();
-  }
 }