You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/08/15 16:44:44 UTC

[beam] branch master updated: Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 56d10f3f9cf Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)
56d10f3f9cf is described below

commit 56d10f3f9cf1292d214d585b59b4fee9d1bc8f3e
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Mon Aug 15 12:44:36 2022 -0400

    Add a dataflow override for runnerv1 to still use SDF on runnerv2. (#22661)
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to log info instead.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to log info instead.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to log info instead.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to log info instead.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to log info instead.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
    
    * Add a dataflow override for runnerv1 to still use SDF on runnerv2.
    
    Also change places where UnboundedReader can throw in normal operation to give an expliclty harmless message.
---
 runners/google-cloud-dataflow-java/build.gradle    |  6 ++-
 .../beam/runners/dataflow/DataflowRunner.java      |  2 +
 .../pubsublite/internal/SubscribeTransform.java    | 46 +++++++++++++++++++++-
 .../pubsublite/internal/UnboundedReaderImpl.java   | 14 ++++---
 4 files changed, 61 insertions(+), 7 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index f3b5bbb7385..22f9d64de45 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -89,9 +89,13 @@ dependencies {
   implementation library.java.avro
   implementation library.java.bigdataoss_util
   implementation library.java.commons_codec
-  implementation library.java.flogger_system_backend // Avoids conflicts with bigdataoss_util (BEAM-11010)
+  // Avoids conflicts with bigdataoss_util (BEAM-11010)
+  implementation library.java.flogger_system_backend
   permitUnusedDeclared library.java.flogger_system_backend
   implementation library.java.google_api_client
+  // Ensures SequencedMessage availability for Spotless
+  implementation library.java.proto_google_cloud_pubsublite_v1
+  permitUnusedDeclared library.java.proto_google_cloud_pubsublite_v1
   implementation library.java.google_api_services_clouddebugger
   implementation library.java.google_api_services_dataflow
   implementation library.java.google_api_services_storage
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 424925a522b..bb3355c8c55 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -113,6 +113,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscribeTransform;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -535,6 +536,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+      overridesBuilder.add(SubscribeTransform.V1_READ_OVERRIDE);
 
       if (!hasExperiment(options, "enable_file_dynamic_sharding")) {
         overridesBuilder.add(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index e073cbc7ef7..493d7a72d7c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -29,17 +29,24 @@ import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.wire.Subscriber;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -180,8 +187,45 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
             new UnboundedSourceImpl(options, this::newBufferedSubscriber, this::newBacklogReader)));
   }
 
+  private static final class SourceTransform
+      extends PTransform<PBegin, PCollection<SequencedMessage>> {
+
+    private final SubscribeTransform impl;
+
+    private SourceTransform(SubscribeTransform impl) {
+      this.impl = impl;
+    }
+
+    @Override
+    public PCollection<SequencedMessage> expand(PBegin input) {
+      return impl.expandSource(input);
+    }
+  }
+
+  public static final PTransformOverride V1_READ_OVERRIDE =
+      PTransformOverride.of(
+          PTransformMatchers.classEqualTo(SubscribeTransform.class), new ReadOverrideFactory());
+
+  private static class ReadOverrideFactory
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<SequencedMessage>, SubscribeTransform> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<SequencedMessage>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<SequencedMessage>, SubscribeTransform> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new SourceTransform(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<SequencedMessage> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
   @Override
   public PCollection<SequencedMessage> expand(PBegin input) {
-    return expandSource(input);
+    return expandSdf(input);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
index bbd7c4e649d..adabdfd782b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.java
@@ -23,6 +23,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.io.IOException;
@@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
 public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
-
   private final UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;
   private final MemoryBufferedSubscriber subscriber;
   private final TopicBacklogReader backlogReader;
@@ -82,7 +82,7 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
         AutoCloseable c2 = committer;
         AutoCloseable c3 = asCloseable(subscriber)) {
     } catch (Exception e) {
-      throw new IOException(e);
+      throw new IOException("Failed when closing reader.", e);
     }
   }
 
@@ -99,9 +99,13 @@ public class UnboundedReaderImpl extends UnboundedReader<SequencedMessage> {
   @Override
   public boolean advance() throws IOException {
     if (!subscriber.state().equals(State.RUNNING)) {
-      throw new IOException(
-          "Subscriber failed. If the runner recently resized, and the error contains `A second subscriber connected`, this can be ignored.",
-          subscriber.failureCause());
+      Throwable t = subscriber.failureCause();
+      if ("DUPLICATE_SUBSCRIBER_CONNECTIONS"
+          .equals(ExtractStatus.getErrorInfoReason(ExtractStatus.toCanonical(t)))) {
+        throw new IOException(
+            "Partition reassigned to a different worker- this is expected and can be ignored.", t);
+      }
+      throw new IOException("Subscriber failed when trying to advance.", t);
     }
     if (advanced) {
       subscriber.pop();