You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/10 03:21:52 UTC

[beam] branch master updated: [BEAM-2939, BEAM-9458] Use deduplication transform for UnboundedSources wrapped as SDFs that need dedup

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b8149d3  [BEAM-2939, BEAM-9458] Use deduplication transform for UnboundedSources wrapped as SDFs that need dedup
     new d84b80b  Merge pull request #11068 from lukecwik/splittabledofn3
b8149d3 is described below

commit b8149d3c6e58c4daf18b1ad407da4009d40ecc5f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Mar 6 16:01:16 2020 -0800

    [BEAM-2939, BEAM-9458] Use deduplication transform for UnboundedSources wrapped as SDFs that need dedup
    
    Now that dedup is supported, migrate Dataflow to use UnboundedSources as SDF wrapper.
---
 .../beam/runners/dataflow/DataflowRunner.java      | 35 ++++++++++++----------
 .../src/main/java/org/apache/beam/sdk/io/Read.java | 18 +++++------
 2 files changed, 29 insertions(+), 24 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 710bf0d..8a7dc05 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -103,6 +103,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -433,21 +434,25 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 PTransformMatchers.classEqualTo(Create.Values.class),
                 new StreamingFnApiCreateOverrideFactory()));
       }
-      overridesBuilder
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.writeWithRunnerDeterminedSharding(),
-                  new StreamingShardedWriteFactory(options)))
-          .add(
-              // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
-              // must precede it
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(Read.Bounded.class),
-                  new StreamingBoundedReadOverrideFactory()))
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(Read.Unbounded.class),
-                  new StreamingUnboundedReadOverrideFactory()));
+      overridesBuilder.add(
+          PTransformOverride.of(
+              PTransformMatchers.writeWithRunnerDeterminedSharding(),
+              new StreamingShardedWriteFactory(options)));
+      if (!fnApiEnabled
+          || ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")) {
+        overridesBuilder
+            .add(
+                // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
+                // must precede it
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(Read.Bounded.class),
+                    new StreamingBoundedReadOverrideFactory()))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(Read.Unbounded.class),
+                    new StreamingUnboundedReadOverrideFactory()));
+      }
+
       if (!fnApiEnabled) {
         overridesBuilder.add(
             PTransformOverride.of(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b22855d..6c51686 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Deduplicate;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -202,8 +203,7 @@ public class Read {
 
       if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
           && !ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
-          && !source.requiresDeduping()) {
+              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
         // We don't use Create here since Create is defined as a BoundedSource and using it would
         // cause an infinite expansion loop. We can reconsider this if Create is implemented
         // directly as a SplittableDoFn.
@@ -222,13 +222,13 @@ public class Read {
                         new UnboundedSourceAsSDFWrapperFn<>(
                             (Coder<CheckpointMark>) source.getCheckpointMarkCoder())))
                 .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
-        // TODO(BEAM-2939): Add support for deduplication.
-        // if (source.requiresDeduping()) {
-        //   outputWithIds.apply(
-        //       Distinct.<ValueWithRecordId<T>, byte[]>withRepresentativeValueFn(
-        //               element -> element.getId())
-        //           .withRepresentativeType(TypeDescriptor.of(byte[].class)));
-        // }
+
+        if (source.requiresDeduping()) {
+          outputWithIds.apply(
+              Deduplicate.<ValueWithRecordId<T>, byte[]>withRepresentativeValueFn(
+                      element -> element.getId())
+                  .withRepresentativeType(TypeDescriptor.of(byte[].class)));
+        }
         return outputWithIds.apply(ParDo.of(new StripIdsDoFn<>()));
       }