You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2022/06/07 19:48:50 UTC
[beam] branch master updated: DataflowRunner: Experiment added to disable unbounded PCcollection checks, allowing batch execution over unbounded PCollections (#16773)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e95ef975a3c DataflowRunner: Experiment added to disable unbounded PCcollection checks, allowing batch execution over unbounded PCollections (#16773)
e95ef975a3c is described below
commit e95ef975a3cb2f2562493ff2e0943d3e000376dc
Author: Balázs Németh <nb...@users.noreply.github.com>
AuthorDate: Tue Jun 7 21:48:44 2022 +0200
DataflowRunner: Experiment added to disable unbounded PCcollection checks, allowing batch execution over unbounded PCollections (#16773)
---
.../beam/runners/dataflow/DataflowRunner.java | 45 ++++++++++++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 15 ++++-
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 28 ++++++++-
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 69 +++++++++++++++++++++-
4 files changed, 144 insertions(+), 13 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fbef0b8000c..459ad9340c9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -199,6 +199,10 @@ import org.slf4j.LoggerFactory;
})
public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ /** Experiment to "unsafely attempt to process unbounded data in batch mode". */
+ public static final String UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE =
+ "unsafely_attempt_to_process_unbounded_data_in_batch_mode";
+
private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
/** Provided configuration options. */
private final DataflowPipelineOptions options;
@@ -1054,7 +1058,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
- if (containsUnboundedPCollection(pipeline)) {
+ if (shouldActAsStreaming(pipeline)) {
options.setStreaming(true);
}
@@ -1479,29 +1483,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// setup overrides.
@VisibleForTesting
protected void replaceV1Transforms(Pipeline pipeline) {
- boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
+ boolean streaming = shouldActAsStreaming(pipeline);
// Ensure all outputs of all reads are consumed before potentially replacing any
// Read PTransforms
UnconsumedReads.ensureAllReadsConsumed(pipeline);
pipeline.replaceAll(getOverrides(streaming));
}
- private boolean containsUnboundedPCollection(Pipeline p) {
+ private boolean shouldActAsStreaming(Pipeline p) {
class BoundednessVisitor extends PipelineVisitor.Defaults {
- IsBounded boundedness = IsBounded.BOUNDED;
+ final List<PCollection> unboundedPCollections = new ArrayList<>();
@Override
public void visitValue(PValue value, Node producer) {
if (value instanceof PCollection) {
- boundedness = boundedness.and(((PCollection) value).isBounded());
+ PCollection pc = (PCollection) value;
+ if (pc.isBounded() == IsBounded.UNBOUNDED) {
+ unboundedPCollections.add(pc);
+ }
}
}
}
BoundednessVisitor visitor = new BoundednessVisitor();
p.traverseTopologically(visitor);
- return visitor.boundedness == IsBounded.UNBOUNDED;
+ if (visitor.unboundedPCollections.isEmpty()) {
+ if (options.isStreaming()) {
+ LOG.warn(
+ "No unbounded PCollection(s) found in a streaming pipeline! "
+ + "You might consider using 'streaming=false'!");
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ if (options.isStreaming()) {
+ return true;
+ } else if (hasExperiment(options, UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE)) {
+ LOG.info(
+ "Turning a batch pipeline into streaming due to unbounded PCollection(s) has been avoided! "
+ + "Unbounded PCollection(s): {}",
+ visitor.unboundedPCollections);
+ return false;
+ } else {
+ LOG.warn(
+ "Unbounded PCollection(s) found in a batch pipeline! "
+ + "You might consider using 'streaming=true'! "
+ + "Unbounded PCollection(s): {}",
+ visitor.unboundedPCollections);
+ return true;
+ }
+ }
};
/** Returns the DataflowPipelineTranslator associated with this object. */
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 509aa17103e..82a6f30fcfa 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1445,6 +1445,9 @@ public class KafkaIO {
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
+ if (kafkaRead.getStopReadTime() != null) {
+ readTransform = readTransform.withBounded();
+ }
PCollection<KafkaSourceDescriptor> output;
if (kafkaRead.isDynamicRead()) {
Set<String> topics = new HashSet<>();
@@ -1843,6 +1846,8 @@ public class KafkaIO {
abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+ abstract boolean isBounded();
+
abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
@AutoValue.Builder
@@ -1880,6 +1885,8 @@ public class KafkaIO {
abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> policy);
+ abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);
+
abstract ReadSourceDescriptors<K, V> build();
}
@@ -1888,6 +1895,7 @@ public class KafkaIO {
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
.setCommitOffsetEnabled(false)
+ .setBounded(false)
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
@@ -2180,6 +2188,11 @@ public class KafkaIO {
.withManualWatermarkEstimator();
}
+ /** Enable treating the Kafka sources as bounded as opposed to the unbounded default. */
+ ReadSourceDescriptors<K, V> withBounded() {
+ return toBuilder().setBounded(true).build();
+ }
+
@Override
public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) {
checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
@@ -2214,7 +2227,7 @@ public class KafkaIO {
try {
PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
input
- .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+ .apply(ParDo.of(ReadFromKafkaDoFn.<K, V>create(this)))
.setCoder(
KvCoder.of(
input
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 311a03351ab..b898af1ac6b 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
@@ -137,15 +138,36 @@ import org.slf4j.LoggerFactory;
* stopping reading from removed {@link TopicPartition}, the stopping reading may not happens
* immediately.
*/
-@UnboundedPerElement
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-class ReadFromKafkaDoFn<K, V>
+abstract class ReadFromKafkaDoFn<K, V>
extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
- ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
+ static <K, V> ReadFromKafkaDoFn<K, V> create(ReadSourceDescriptors transform) {
+ if (transform.isBounded()) {
+ return new Bounded<K, V>(transform);
+ } else {
+ return new Unbounded<K, V>(transform);
+ }
+ }
+
+ @UnboundedPerElement
+ private static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> {
+ Unbounded(ReadSourceDescriptors transform) {
+ super(transform);
+ }
+ }
+
+ @BoundedPerElement
+ private static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
+ Bounded(ReadSourceDescriptors transform) {
+ super(transform);
+ }
+ }
+
+ private ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
this.consumerConfig = transform.getConsumerConfig();
this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 0c0dc00b48c..80b1fac21b7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -25,14 +25,27 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -52,6 +65,7 @@ import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -70,13 +84,13 @@ public class ReadFromKafkaDoFnTest {
new SimpleMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition);
private final ReadFromKafkaDoFn<String, String> dofnInstance =
- new ReadFromKafkaDoFn(makeReadSourceDescriptor(consumer));
+ ReadFromKafkaDoFn.create(makeReadSourceDescriptor(consumer));
private final ExceptionMockKafkaConsumer exceptionConsumer =
new ExceptionMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition);
private final ReadFromKafkaDoFn<String, String> exceptionDofnInstance =
- new ReadFromKafkaDoFn<>(makeReadSourceDescriptor(exceptionConsumer));
+ ReadFromKafkaDoFn.create(makeReadSourceDescriptor(exceptionConsumer));
private ReadSourceDescriptors<String, String> makeReadSourceDescriptor(
Consumer kafkaMockConsumer) {
@@ -417,7 +431,7 @@ public class ReadFromKafkaDoFnTest {
public void testProcessElementWhenTopicPartitionIsStopped() throws Exception {
MockOutputReceiver receiver = new MockOutputReceiver();
ReadFromKafkaDoFn<String, String> instance =
- new ReadFromKafkaDoFn(
+ ReadFromKafkaDoFn.create(
makeReadSourceDescriptor(consumer)
.toBuilder()
.setCheckStopReadingFn(
@@ -454,4 +468,53 @@ public class ReadFromKafkaDoFnTest {
null,
(OutputReceiver) receiver);
}
+
+ private static final TypeDescriptor<KafkaSourceDescriptor>
+ KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR = new TypeDescriptor<KafkaSourceDescriptor>() {};
+
+ @Test
+ public void testBounded() {
+ BoundednessVisitor visitor = testBoundedness(rsd -> rsd.withBounded());
+ Assert.assertEquals(0, visitor.unboundedPCollections.size());
+ }
+
+ @Test
+ public void testUnbounded() {
+ BoundednessVisitor visitor = testBoundedness(rsd -> rsd);
+ Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
+ }
+
+ private BoundednessVisitor testBoundedness(
+ Function<ReadSourceDescriptors<String, String>, ReadSourceDescriptors<String, String>>
+ readSourceDescriptorsDecorator) {
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.empty(KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR))
+ .apply(
+ ParDo.of(
+ ReadFromKafkaDoFn.<String, String>create(
+ readSourceDescriptorsDecorator.apply(makeReadSourceDescriptor(consumer)))))
+ .setCoder(
+ KvCoder.of(
+ SerializableCoder.of(KafkaSourceDescriptor.class),
+ org.apache.beam.sdk.io.kafka.KafkaRecordCoder.of(
+ StringUtf8Coder.of(), StringUtf8Coder.of())));
+
+ BoundednessVisitor visitor = new BoundednessVisitor();
+ p.traverseTopologically(visitor);
+ return visitor;
+ }
+
+ static class BoundednessVisitor extends PipelineVisitor.Defaults {
+ final List<PCollection> unboundedPCollections = new ArrayList<>();
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ PCollection pc = (PCollection) value;
+ if (pc.isBounded() == IsBounded.UNBOUNDED) {
+ unboundedPCollections.add(pc);
+ }
+ }
+ }
+ }
}