You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/12/28 21:19:21 UTC
[beam] branch master updated: [BEAM-13402] Simplify PubsubLiteSink
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7dbbd09 [BEAM-13402] Simplify PubsubLiteSink
new b33bebc Merge pull request #16215 from dpcollins-google/publish-no-desync
7dbbd09 is described below
commit 7dbbd097585017c7a655afc041435123745086dc
Author: Daniel Collins <dp...@google.com>
AuthorDate: Sun Dec 12 23:54:02 2021 -0500
[BEAM-13402] Simplify PubsubLiteSink
This also makes it not afflicted by futures never terminating by avoiding indefinite waits
---
.../io/gcp/pubsublite/internal/PublisherCache.java | 5 +
.../gcp/pubsublite/internal/PublisherOrError.java | 49 ----------
.../io/gcp/pubsublite/internal/PubsubLiteSink.java | 107 +++++----------------
.../pubsublite/internal/PubsubLiteSinkTest.java | 46 ---------
4 files changed, 29 insertions(+), 178 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
index 76b69b3..ac85ba9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
@@ -27,9 +27,13 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** A map of working publishers by PublisherOptions. */
class PublisherCache implements AutoCloseable {
+ private final Logger logger = LoggerFactory.getLogger(PublisherCache.class);
+
@GuardedBy("this")
private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
new HashMap<>();
@@ -49,6 +53,7 @@ class PublisherCache implements AutoCloseable {
new Listener() {
@Override
public void failed(State s, Throwable t) {
+ logger.warn("Publisher failed.", t);
evict(options);
}
},
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java
deleted file mode 100644
index 7eb1c66..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import com.google.auto.value.AutoOneOf;
-import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.Publisher;
-
-/** A helper representing either a Publisher or an error. */
-@AutoOneOf(PublisherOrError.Kind.class)
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-abstract class PublisherOrError {
- enum Kind {
- PUBLISHER,
- ERROR
- }
-
- abstract Kind getKind();
-
- abstract Publisher<MessageMetadata> publisher();
-
- abstract CheckedApiException error();
-
- static PublisherOrError ofPublisher(Publisher<MessageMetadata> p) {
- return AutoOneOf_PublisherOrError.publisher(p);
- }
-
- static PublisherOrError ofError(CheckedApiException e) {
- return AutoOneOf_PublisherOrError.error(e);
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
index 4b666d2..f370919 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
@@ -17,25 +17,20 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.PublisherOrError.Kind;
import org.apache.beam.sdk.transforms.DoFn;
/** A sink which publishes messages to Pub/Sub Lite. */
@@ -46,98 +41,44 @@ public class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
private final PublisherOptions options;
@GuardedBy("this")
- private transient PublisherOrError publisherOrError;
-
- // Whenever outstanding is decremented, notify() must be called.
- @GuardedBy("this")
- private transient int outstanding;
-
- @GuardedBy("this")
- private transient Deque<CheckedApiException> errorsSinceLastFinish;
+ private transient RunState runState;
public PubsubLiteSink(PublisherOptions options) {
this.options = options;
}
- @Setup
- public void setup() throws ApiException {
- Publisher<MessageMetadata> publisher;
- publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
- synchronized (this) {
- outstanding = 0;
- errorsSinceLastFinish = new ArrayDeque<>();
- publisherOrError = PublisherOrError.ofPublisher(publisher);
+ private static class RunState {
+ private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();
+
+ private final Publisher<MessageMetadata> publisher;
+
+ RunState(PublisherOptions options) {
+ publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+ }
+
+ void publish(PubSubMessage message) {
+ futures.add(publisher.publish(Message.fromProto(message)));
+ }
+
+ void waitForDone() throws Exception {
+ ApiFutures.allAsList(futures).get(1, MINUTES);
}
- // cannot declare in inner class since 'this' means something different.
- Consumer<Throwable> onFailure =
- t -> {
- synchronized (this) {
- publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
- }
- };
- publisher.addListener(
- new Listener() {
- @Override
- public void failed(State s, Throwable t) {
- onFailure.accept(t);
- }
- },
- SystemExecutors.getFuturesExecutor());
}
- private synchronized void decrementOutstanding() {
- --outstanding;
- notify();
+ @StartBundle
+ public synchronized void startBundle() throws ApiException {
+ runState = new RunState(options);
}
@ProcessElement
public synchronized void processElement(@Element PubSubMessage message)
throws CheckedApiException {
- ++outstanding;
- if (publisherOrError.getKind() == Kind.ERROR) {
- throw publisherOrError.error();
- }
- ApiFuture<MessageMetadata> future =
- publisherOrError.publisher().publish(Message.fromProto(message));
- // cannot declare in inner class since 'this' means something different.
- Consumer<Throwable> onFailure =
- t -> {
- synchronized (this) {
- decrementOutstanding();
- errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
- }
- };
- ApiFutures.addCallback(
- future,
- new ApiFutureCallback<MessageMetadata>() {
- @Override
- public void onSuccess(MessageMetadata messageMetadata) {
- decrementOutstanding();
- }
-
- @Override
- public void onFailure(Throwable t) {
- onFailure.accept(t);
- }
- },
- SystemExecutors.getFuturesExecutor());
+ runState.publish(message);
}
// Intentionally don't flush on bundle finish to allow multi-sink client reuse.
@FinishBundle
- public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
- while (outstanding > 0) {
- wait();
- }
- if (!errorsSinceLastFinish.isEmpty()) {
- CheckedApiException canonical = errorsSinceLastFinish.pop();
- while (!errorsSinceLastFinish.isEmpty()) {
- canonical.addSuppressed(errorsSinceLastFinish.pop());
- }
- throw canonical;
- }
- if (publisherOrError.getKind() == Kind.ERROR) {
- throw publisherOrError.error();
- }
+ public synchronized void finishBundle() throws Exception {
+ runState.waitForDone();
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
index df638ff..5c4e731 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java
@@ -23,14 +23,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.core.ApiFutures;
-import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.CloudRegion;
@@ -52,7 +49,6 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
@@ -68,7 +64,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
-import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class PubsubLiteSinkTest {
@@ -92,9 +87,6 @@ public class PubsubLiteSinkTest {
private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions());
- // Initialized in setUp.
- private ApiService.Listener listener;
-
@Captor
final ArgumentCaptor<Message> publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);
@@ -110,16 +102,6 @@ public class PubsubLiteSinkTest {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), publisher);
- doAnswer(
- (Answer<Void>)
- args -> {
- listener = args.getArgument(0);
- return null;
- })
- .when(publisher)
- .addListener(any(), any());
- sink.setup();
- verify(publisher).addListener(any(), any());
}
@Test
@@ -207,32 +189,4 @@ public class PubsubLiteSinkTest {
assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
exec.shutdownNow();
}
-
- @Test
- public void listenerExceptionOnBundleFinish() throws Exception {
- Message message1 = Message.builder().build();
- SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
-
- SettableApiFuture<Void> publishFuture = SettableApiFuture.create();
- when(publisher.publish(message1))
- .thenAnswer(
- args -> {
- publishFuture.set(null);
- return future;
- });
- Future<?> executorFuture =
- Executors.newSingleThreadExecutor()
- .submit(
- () -> {
- PipelineExecutionException e =
- assertThrows(PipelineExecutionException.class, () -> runWith(message1));
- Optional<CheckedApiException> statusOr = ExtractStatus.extract(e.getCause());
- assertTrue(statusOr.isPresent());
- assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
- });
- publishFuture.get();
- listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying);
- future.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
- executorFuture.get();
- }
}