You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/08/15 16:44:44 UTC
[beam] branch master updated: Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 56d10f3f9cf Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)
56d10f3f9cf is described below
commit 56d10f3f9cf1292d214d585b59b4fee9d1bc8f3e
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Mon Aug 15 12:44:36 2022 -0400
Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to log info instead.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to log info instead.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to log info instead.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to log info instead.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to log info instead.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
* Add a dataflow override for runnerv1 to still use SDF on runnerv2.
Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
---
runners/google-cloud-dataflow-java/build.gradle | 6 ++-
.../beam/runners/dataflow/DataflowRunner.java | 2 +
.../pubsublite/internal/SubscribeTransform.java | 46 +++++++++++++++++++++-
.../pubsublite/internal/UnboundedReaderImpl.java | 14 ++++---
4 files changed, 61 insertions(+), 7 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index f3b5bbb7385..22f9d64de45 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -89,9 +89,13 @@ dependencies {
implementation library.java.avro
implementation library.java.bigdataoss_util
implementation library.java.commons_codec
- implementation library.java.flogger_system_backend // Avoids conflicts with bigdataoss_util (BEAM-11010)
+ // Avoids conflicts with bigdataoss_util (BEAM-11010)
+ implementation library.java.flogger_system_backend
permitUnusedDeclared library.java.flogger_system_backend
implementation library.java.google_api_client
+ // Ensures SequencedMessage availability for Spotless
+ implementation library.java.proto_google_cloud_pubsublite_v1
+ permitUnusedDeclared library.java.proto_google_cloud_pubsublite_v1
implementation library.java.google_api_services_clouddebugger
implementation library.java.google_api_services_dataflow
implementation library.java.google_api_services_storage
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 424925a522b..bb3355c8c55 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -113,6 +113,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscribeTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -535,6 +536,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+ overridesBuilder.add(SubscribeTransform.V1_READ_OVERRIDE);
if (!hasExperiment(options, "enable_file_dynamic_sharding")) {
overridesBuilder.add(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index e073cbc7ef7..493d7a72d7c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -29,17 +29,24 @@ import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,8 +187,45 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
new UnboundedSourceImpl(options, this::newBufferedSubscriber, this::newBacklogReader)));
}
+ private static final class SourceTransform
+ extends PTransform<PBegin, PCollection<SequencedMessage>> {
+
+ private final SubscribeTransform impl;
+
+ private SourceTransform(SubscribeTransform impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public PCollection<SequencedMessage> expand(PBegin input) {
+ return impl.expandSource(input);
+ }
+ }
+
+ public static final PTransformOverride V1_READ_OVERRIDE =
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(SubscribeTransform.class), new ReadOverrideFactory());
+
+ private static class ReadOverrideFactory
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<SequencedMessage>, SubscribeTransform> {
+
+ @Override
+ public PTransformReplacement<PBegin, PCollection<SequencedMessage>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<SequencedMessage>, SubscribeTransform> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new SourceTransform(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs, PCollection<SequencedMessage> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
@Override
public PCollection<SequencedMessage> expand(PBegin input) {
- return expandSource(input);
+ return expandSdf(input);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
index bbd7c4e649d..adabdfd782b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
@@ -23,6 +23,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
@@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
-
private final UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;
private final MemoryBufferedSubscriber subscriber;
private final TopicBacklogReader backlogReader;
@@ -82,7 +82,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
AutoCloseable c2 = committer;
AutoCloseable c3 = asCloseable(subscriber)) {
} catch (Exception e) {
- throw new IOException(e);
+ throw new IOException("Failed when closing reader.", e);
}
}
@@ -99,9 +99,13 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
@Override
public boolean advance() throws IOException {
if (!subscriber.state().equals(State.RUNNING)) {
- throw new IOException(
- "Subscriber failed. If the runner recently resized, and the error contains `A second subscriber connected`, this can be ignored.",
- subscriber.failureCause());
+ Throwable t = subscriber.failureCause();
+ if ("DUPLICATE_SUBSCRIBER_CONNECTIONS"
+ .equals(ExtractStatus.getErrorInfoReason(ExtractStatus.toCanonical(t)))) {
+ throw new IOException(
+ "Partition reassigned to a different worker- this is expected and can be ignored.", t);
+ }
+ throw new IOException("Subscriber failed when trying to advance.", t);
}
if (advanced) {
subscriber.pop();