You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/10 03:21:52 UTC
[beam] branch master updated: [BEAM-2939,
BEAM-9458] Use deduplication transform for UnboundedSources wrapped
as SDFs that need dedup
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b8149d3 [BEAM-2939, BEAM-9458] Use deduplication transform for UnboundedSources wrapped as SDFs that need dedup
new d84b80b Merge pull request #11068 from lukecwik/splittabledofn3
b8149d3 is described below
commit b8149d3c6e58c4daf18b1ad407da4009d40ecc5f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Mar 6 16:01:16 2020 -0800
[BEAM-2939, BEAM-9458] Use deduplication transform for UnboundedSources wrapped as SDFs that need dedup
Now that dedup is supported, migrate Dataflow to use UnboundedSources as SDF wrapper.
---
.../beam/runners/dataflow/DataflowRunner.java | 35 ++++++++++++----------
.../src/main/java/org/apache/beam/sdk/io/Read.java | 18 +++++------
2 files changed, 29 insertions(+), 24 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 710bf0d..8a7dc05 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -103,6 +103,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -433,21 +434,25 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(Create.Values.class),
new StreamingFnApiCreateOverrideFactory()));
}
- overridesBuilder
- .add(
- PTransformOverride.of(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new StreamingShardedWriteFactory(options)))
- .add(
- // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
- // must precede it
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Bounded.class),
- new StreamingBoundedReadOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new StreamingUnboundedReadOverrideFactory()));
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new StreamingShardedWriteFactory(options)));
+ if (!fnApiEnabled
+ || ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")) {
+ overridesBuilder
+ .add(
+ // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
+ // must precede it
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Bounded.class),
+ new StreamingBoundedReadOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Read.Unbounded.class),
+ new StreamingUnboundedReadOverrideFactory()));
+ }
+
if (!fnApiEnabled) {
overridesBuilder.add(
PTransformOverride.of(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index b22855d..6c51686 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.Impulse;
@@ -202,8 +203,7 @@ public class Read {
if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
&& !ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
- && !source.requiresDeduping()) {
+ input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
// We don't use Create here since Create is defined as a BoundedSource and using it would
// cause an infinite expansion loop. We can reconsider this if Create is implemented
// directly as a SplittableDoFn.
@@ -222,13 +222,13 @@ public class Read {
new UnboundedSourceAsSDFWrapperFn<>(
(Coder<CheckpointMark>) source.getCheckpointMarkCoder())))
.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
- // TODO(BEAM-2939): Add support for deduplication.
- // if (source.requiresDeduping()) {
- // outputWithIds.apply(
- // Distinct.<ValueWithRecordId<T>, byte[]>withRepresentativeValueFn(
- // element -> element.getId())
- // .withRepresentativeType(TypeDescriptor.of(byte[].class)));
- // }
+
+ if (source.requiresDeduping()) {
+ outputWithIds.apply(
+ Deduplicate.<ValueWithRecordId<T>, byte[]>withRepresentativeValueFn(
+ element -> element.getId())
+ .withRepresentativeType(TypeDescriptor.of(byte[].class)));
+ }
return outputWithIds.apply(ParDo.of(new StripIdsDoFn<>()));
}