You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/08/15 20:15:05 UTC
[beam] branch master updated: [BEAM-10670] Make Read execute as a
splittable DoFn by default for the Java DirectRunner.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2e7c4ca [BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.
new c720f2a Merge pull request #12519 from lukecwik/beam10670
2e7c4ca is described below
commit 2e7c4ca207074ee98568be9469cadd141ad9cc6d
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Aug 14 13:48:47 2020 -0700
[BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.
---
CHANGES.md | 28 ++
.../runners/core/construction/SplittableParDo.java | 47 ++
.../core/construction/SplittableParDoTest.java | 81 ++++
.../apache/beam/runners/direct/DirectRunner.java | 1 +
.../direct/ImmutabilityEnforcementFactory.java | 58 +++
.../direct/BoundedReadEvaluatorFactoryTest.java | 10 +-
.../runners/direct/DirectGraphVisitorTest.java | 27 +-
.../beam/runners/direct/DirectRunnerTest.java | 1 -
.../beam/runners/direct/EvaluationContextTest.java | 37 +-
.../direct/UnboundedReadEvaluatorFactoryTest.java | 34 +-
.../beam/runners/direct/WatermarkManagerTest.java | 505 +++++++++++----------
.../src/main/java/org/apache/beam/sdk/io/Read.java | 37 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 27 --
.../org/apache/beam/sdk/io/TFRecordIOTest.java | 4 -
.../org/apache/beam/sdk/io/TextIOReadTest.java | 26 --
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 41 --
.../gcp/bigquery/BigQueryIOStorageQueryTest.java | 11 -
.../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 15 -
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 30 --
.../sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 8 +-
20 files changed, 565 insertions(+), 463 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a4882b3..f310621 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -47,6 +47,34 @@
* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+# [2.25.0] - Unreleased
+
+## Highlights
+
+* Splittable DoFn is opt-out for Java based runners (DirectRunner) using `--experiments=use_deprecated_read`. For all other runners, users can opt-in using `--experiments=use_sdf_read`. (Java) ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10135))
+* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+
+## I/Os
+
+* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## New Features / Improvements
+
+* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Breaking Changes
+
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Deprecations
+
+* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Known Issues
+
+* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
# [2.24.0] - Unreleased
## Highlights
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 3187112..acece88 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -35,11 +36,16 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.Transform
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
@@ -630,4 +636,45 @@ public class SplittableParDo<InputT, OutputT, RestrictionT, WatermarkEstimatorSt
invoker = null;
}
}
+
+ /**
+ * Throws an {@link IllegalArgumentException} if the pipeline contains any primitive read
+ * transforms that have not been expanded to be executed as {@link DoFn splittable DoFns} as long
+ * as the experiment {@code use_deprecated_read} is not specified.
+ */
+ public static void validateNoPrimitiveReads(Pipeline pipeline) {
+ // TODO(BEAM-10670): Remove the deprecated Read and make the splittable DoFn the only option.
+ if (!(ExperimentalOptions.hasExperiment(
+ pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
+ || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read"))) {
+
+ pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+ }
+ }
+
+ /**
+ * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that the pipeline does not
+ * contain any primitive reads.
+ */
+ private static class ValidateNoPrimitiveReads extends PipelineVisitor.Defaults {
+ public final List<PTransform<?, ?>> foundPrimitiveReads = new ArrayList<>();
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ if (node.getTransform() instanceof Read.Bounded
+ || node.getTransform() instanceof Read.Unbounded) {
+ foundPrimitiveReads.add(node.getTransform());
+ }
+ }
+
+ @Override
+ public void leavePipeline(Pipeline pipeline) {
+ if (!foundPrimitiveReads.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Found primitive read transforms %s within the pipeline when only Splittable DoFns were expected. If you would like to use the deprecated behavior, please specify the experiment 'use_deprecated_read'. For example '--experiements=use_deprecated_read' on the command line.",
+ foundPrimitiveReads));
+ }
+ }
+ }
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 55c2dbf..65715ac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -19,10 +19,23 @@ package org.apache.beam.runners.core.construction;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -167,4 +180,72 @@ public class SplittableParDoTest {
"unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
.isBounded());
}
+
+ private static class FakeBoundedSource extends BoundedSource<String> {
+ @Override
+ public List<? extends BoundedSource<String>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Coder<String> getOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+ }
+
+ @Test
+ public void testValidateNoPrimitiveReadsIsSkippedWhenUsingDeprecatedRead() {
+ PipelineOptions deprecatedReadOptions = PipelineOptionsFactory.create();
+ deprecatedReadOptions.setRunner(CrashingRunner.class);
+ ExperimentalOptions.addExperiment(
+ deprecatedReadOptions.as(ExperimentalOptions.class), "use_deprecated_read");
+
+ Pipeline deprecatedReadAllowed = Pipeline.create(deprecatedReadOptions);
+ deprecatedReadAllowed.apply(Read.from(new FakeBoundedSource()));
+ deprecatedReadAllowed.apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+ // We expect that the experiment will skip validation.
+ SplittableParDo.validateNoPrimitiveReads(deprecatedReadAllowed);
+ }
+
+ @Test
+ public void testValidateNoPrimitiveReadsWhenThereAreNone() {
+ PipelineOptions sdfOptions = PipelineOptionsFactory.create();
+ sdfOptions.setRunner(CrashingRunner.class);
+ ExperimentalOptions.addExperiment(sdfOptions.as(ExperimentalOptions.class), "beam_fn_api");
+ Pipeline sdf = Pipeline.create(sdfOptions);
+ sdf.apply(Read.from(new FakeBoundedSource()));
+ sdf.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+ // We expect that the experiment will have caused the transform to use SDF wrappers during
+ // execution.
+ SplittableParDo.validateNoPrimitiveReads(sdf);
+ }
+
+ @Test
+ public void testValidateNoPrimitiveReadsFindsPrimitiveReads() {
+ PipelineOptions noSdfOptions = PipelineOptionsFactory.create();
+ noSdfOptions.setRunner(CrashingRunner.class);
+ Pipeline boundedRead = Pipeline.create(noSdfOptions);
+ boundedRead.apply(Read.from(new FakeBoundedSource()));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SplittableParDo.validateNoPrimitiveReads(boundedRead));
+
+ Pipeline unboundedRead = Pipeline.create(noSdfOptions);
+ unboundedRead.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SplittableParDo.validateNoPrimitiveReads(unboundedRead));
+ }
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f7c74c0..d980588 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -179,6 +179,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
DisplayDataValidator.validatePipeline(pipeline);
DisplayDataValidator.validateOptions(options);
+ SplittableParDo.validateNoPrimitiveReads(pipeline);
ExecutorService metricsPool =
Executors.newCachedThreadPool(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index f5f0242..144318b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -19,9 +19,13 @@ package org.apache.beam.runners.direct;
import java.util.IdentityHashMap;
import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
@@ -40,9 +44,63 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
@Override
public <T> ModelEnforcement<T> forBundle(
CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+ if (isReadTransform(consumer)) {
+ return NoopReadEnforcement.INSTANCE;
+ }
return new ImmutabilityCheckingEnforcement<>(input, consumer);
}
+ private static boolean isReadTransform(AppliedPTransform<?, ?, ?> consumer) {
+ IsReadVisitor visitor = new IsReadVisitor(consumer.getTransform());
+ consumer.getPipeline().traverseTopologically(visitor);
+ return visitor.isRead();
+ }
+
+ private static class IsReadVisitor extends PipelineVisitor.Defaults {
+ private final PTransform<?, ?> transform;
+ private boolean isRead;
+ private boolean isInsideRead;
+
+ private IsReadVisitor(PTransform<?, ?> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (node.getTransform() instanceof Read.Bounded
+ || node.getTransform() instanceof Read.Unbounded) {
+ isInsideRead = true;
+ }
+ if (isInsideRead && node.getTransform() == transform) {
+ isRead = true;
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.getTransform() instanceof Read.Bounded
+ || node.getTransform() instanceof Read.Unbounded) {
+ isInsideRead = false;
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ if (isInsideRead && node.getTransform() == transform) {
+ isRead = true;
+ }
+ }
+
+ private boolean isRead() {
+ return isRead;
+ }
+ }
+
+ private static class NoopReadEnforcement<T> extends AbstractModelEnforcement<T> {
+ private static final NoopReadEnforcement INSTANCE = new NoopReadEnforcement<>();
+ }
+
private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
private final AppliedPTransform<?, ?, ?> transform;
private final Map<WindowedValue<T>, MutationDetector> mutationElements;
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 52334cf..ccb1971 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -77,7 +77,12 @@ public class BoundedReadEvaluatorFactoryTest {
private BundleFactory bundleFactory;
private AppliedPTransform<?, ?, ?> longsProducer;
- @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ @Rule
+ public TestPipeline p =
+ TestPipeline.fromOptions(
+ PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create())
+ .enableAbandonedNodeEnforcement(false);
+
private PipelineOptions options;
@Before
@@ -86,10 +91,9 @@ public class BoundedReadEvaluatorFactoryTest {
source = CountingSource.upTo(10L);
longs = p.apply(Read.from(source));
- options = PipelineOptionsFactory.create();
factory =
new BoundedReadEvaluatorFactory(
- context, options, Long.MAX_VALUE /* minimum size for dynamic splits */);
+ context, p.getOptions(), Long.MAX_VALUE /* minimum size for dynamic splits */);
bundleFactory = ImmutableListBundleFactory.create();
longsProducer = DirectGraphs.getProducer(longs);
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index c7536aa..b89e00a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -25,18 +25,17 @@ import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
@@ -86,24 +85,19 @@ public class DirectGraphVisitorTest implements Serializable {
@Test
public void getRootTransformsContainsRootTransforms() {
- PCollection<String> created = p.apply(Create.of("foo", "bar"));
- PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
- PCollection<Long> unCounted = p.apply(GenerateSequence.from(0));
+ PCollection<byte[]> impulse = p.apply(Impulse.create());
+ impulse.apply(WithKeys.of("abc"));
p.traverseTopologically(visitor);
DirectGraph graph = visitor.getGraph();
- assertThat(graph.getRootTransforms(), hasSize(3));
+ assertThat(graph.getRootTransforms(), hasSize(1));
assertThat(
graph.getRootTransforms(),
- Matchers.containsInAnyOrder(
- new Object[] {
- graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)
- }));
+ Matchers.containsInAnyOrder(new Object[] {graph.getProducer(impulse)}));
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
// Root transforms will have no inputs
assertThat(root.getInputs().entrySet(), emptyIterable());
assertThat(
- Iterables.getOnlyElement(root.getOutputs().values()),
- Matchers.<POutput>isOneOf(created, counted, unCounted));
+ Iterables.getOnlyElement(root.getOutputs().values()), Matchers.<POutput>isOneOf(impulse));
}
}
@@ -198,8 +192,11 @@ public class DirectGraphVisitorTest implements Serializable {
p.traverseTopologically(visitor);
DirectGraph graph = visitor.getGraph();
- assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0"));
- assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1"));
+ // Step names are of the format "s#" such as "s0", "s1", ...
+ int createdStepIndex =
+ Integer.parseInt(graph.getStepName(graph.getProducer(created)).substring(1));
+ assertThat(
+ graph.getStepName(graph.getProducer(transformed)), equalTo("s" + (createdStepIndex + 1)));
// finished doesn't have a producer, because it's not a PValue.
// TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to
// use, or make them so.
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index fbcf0c0..b493d2f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -601,7 +601,6 @@ public class DirectRunnerTest implements Serializable {
Pipeline p = getPipeline();
p.apply(GenerateSequence.from(0)).setCoder(new LongNoDecodeCoder());
- thrown.expectCause(isA(CoderException.class));
thrown.expectMessage("Cannot decode a long");
p.run();
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index aa1dd41..3e25b97 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -46,13 +46,12 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -79,10 +78,10 @@ import org.junit.runners.JUnit4;
public class EvaluationContextTest {
private EvaluationContext context;
- private PCollection<Integer> created;
- private PCollection<KV<String, Integer>> downstream;
- private PCollectionView<Iterable<Integer>> view;
- private PCollection<Long> unbounded;
+ private PCollection<byte[]> impulse;
+ private PCollection<KV<String, byte[]>> downstream;
+ private PCollectionView<Iterable<byte[]>> view;
+ private PCollection<byte[]> unbounded;
private DirectGraph graph;
@@ -97,10 +96,10 @@ public class EvaluationContextTest {
public void setup() {
DirectRunner runner = DirectRunner.fromOptions(PipelineOptionsFactory.create());
- created = p.apply(Create.of(1, 2, 3));
- downstream = created.apply(WithKeys.of("foo"));
- view = created.apply(View.asIterable());
- unbounded = p.apply(GenerateSequence.from(0));
+ impulse = p.apply(Impulse.create());
+ downstream = impulse.apply(WithKeys.of("foo"));
+ view = impulse.apply(View.asIterable());
+ unbounded = p.apply(Impulse.create()).setIsBoundedInternal(IsBounded.UNBOUNDED);
p.replaceAll(runner.defaultTransformOverrides());
@@ -118,7 +117,7 @@ public class EvaluationContextTest {
keyedPValueTrackingVisitor.getKeyedPValues(),
Executors.newSingleThreadExecutor());
- createdProducer = graph.getProducer(created);
+ createdProducer = graph.getProducer(impulse);
downstreamProducer = graph.getProducer(downstream);
viewProducer = graph.getProducer(view);
unboundedProducer = graph.getProducer(unbounded);
@@ -126,7 +125,7 @@ public class EvaluationContextTest {
@Test
public void writeToViewWriterThenReadReads() {
- PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
+ PCollectionViewWriter<?, Iterable<byte[]>> viewWriter =
context.createPCollectionViewWriter(
PCollection.createPrimitiveOutputInternal(
p,
@@ -187,7 +186,7 @@ public class EvaluationContextTest {
context.handleResult(
ImmutableListBundleFactory.create()
- .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created)
+ .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), impulse)
.commit(Instant.now()),
ImmutableList.of(),
StepTransformResult.withoutHold(createdProducer)
@@ -262,7 +261,7 @@ public class EvaluationContextTest {
StepTransformResult.withoutHold(downstreamProducer).withState(state).build();
context.handleResult(
- context.createKeyedBundle(myKey, created).commit(Instant.now()),
+ context.createKeyedBundle(myKey, impulse).commit(Instant.now()),
ImmutableList.of(),
stateResult);
@@ -330,7 +329,7 @@ public class EvaluationContextTest {
// haven't added any timers, must be empty
assertThat(context.extractFiredTimers(), emptyIterable());
context.handleResult(
- context.createKeyedBundle(key, created).commit(Instant.now()),
+ context.createKeyedBundle(key, impulse).commit(Instant.now()),
ImmutableList.of(),
timerResult);
@@ -355,7 +354,7 @@ public class EvaluationContextTest {
@Test
public void createKeyedBundleKeyed() {
StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
- CommittedBundle<KV<String, Integer>> keyedBundle =
+ CommittedBundle<KV<String, byte[]>> keyedBundle =
context.createKeyedBundle(key, downstream).commit(Instant.now());
assertThat(keyedBundle.getKey(), equalTo(key));
}
@@ -374,8 +373,8 @@ public class EvaluationContextTest {
public void isDoneWithPartiallyDone() {
assertThat(context.isDone(), is(false));
- UncommittedBundle<Integer> rootBundle = context.createBundle(created);
- rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+ UncommittedBundle<byte[]> rootBundle = context.createBundle(impulse);
+ rootBundle.add(WindowedValue.valueInGlobalWindow(new byte[0]));
CommittedResult handleResult =
context.handleResult(
null,
@@ -390,7 +389,7 @@ public class EvaluationContextTest {
null, ImmutableList.of(), StepTransformResult.withoutHold(unboundedProducer).build());
assertThat(context.isDone(), is(false));
- for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(created)) {
+ for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(impulse)) {
context.handleResult(
committedBundle, ImmutableList.of(), StepTransformResult.withoutHold(consumers).build());
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 4b257be..4033d3b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -106,17 +106,20 @@ public class UnboundedReadEvaluatorFactoryTest {
private DirectGraph graph;
@Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
- private PipelineOptions options;
+
+ @Rule
+ public TestPipeline p =
+ TestPipeline.fromOptions(
+ PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create())
+ .enableAbandonedNodeEnforcement(false);
@Before
public void setup() {
source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
longs = p.apply(Read.from(source));
- options = PipelineOptionsFactory.create();
context = mock(EvaluationContext.class);
- factory = new UnboundedReadEvaluatorFactory(context, options);
+ factory = new UnboundedReadEvaluatorFactory(context, p.getOptions());
output = bundleFactory.createBundle(longs);
graph = DirectGraphs.getGraph(p);
when(context.createBundle(longs)).thenReturn(output);
@@ -128,7 +131,7 @@ public class UnboundedReadEvaluatorFactoryTest {
int numSplits = 5;
Collection<CommittedBundle<?>> initialInputs =
- new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+ new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
.getInitialInputs(graph.getProducer(longs), numSplits);
// CountingSource.unbounded has very good splitting behavior
assertThat(initialInputs, hasSize(numSplits));
@@ -161,7 +164,7 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
- new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+ new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
.getInitialInputs(graph.getProducer(longs), 1);
CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
@@ -202,7 +205,7 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
- new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+ new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
.getInitialInputs(sourceTransform, 1);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
@@ -242,7 +245,7 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
- new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+ new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
.getInitialInputs(sourceTransform, 1);
// Process the initial shard. This might produce some output, and will produce a residual shard
@@ -311,8 +314,8 @@ public class UnboundedReadEvaluatorFactoryTest {
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
- new UnboundedReadEvaluatorFactory(context, options, 1.0 /* Always reuse */);
- new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+ new UnboundedReadEvaluatorFactory(context, p.getOptions(), 1.0 /* Always reuse */);
+ new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
.getInitialInputs(sourceTransform, 1);
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle;
@@ -356,7 +359,7 @@ public class UnboundedReadEvaluatorFactoryTest {
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
- new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
+ new UnboundedReadEvaluatorFactory(context, p.getOptions(), 0.0 /* never reuse */);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
@@ -402,7 +405,7 @@ public class UnboundedReadEvaluatorFactoryTest {
.add(shard)
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
- new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
+ new UnboundedReadEvaluatorFactory(context, p.getOptions(), 0.0 /* never reuse */);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
thrown.expect(IOException.class);
@@ -438,10 +441,10 @@ public class UnboundedReadEvaluatorFactoryTest {
emptySet(),
Executors.newCachedThreadPool());
final UnboundedReadEvaluatorFactory factory =
- new UnboundedReadEvaluatorFactory(context, options);
+ new UnboundedReadEvaluatorFactory(context, p.getOptions());
final Read.Unbounded<String> unbounded = Read.from(source);
- final Pipeline pipeline = Pipeline.create(options);
+ final Pipeline pipeline = Pipeline.create(p.getOptions());
final PCollection<String> pCollection = pipeline.apply(unbounded);
final AppliedPTransform<PBegin, PCollection<String>, Read.Unbounded<String>> application =
AppliedPTransform.of(
@@ -452,7 +455,8 @@ public class UnboundedReadEvaluatorFactoryTest {
pipeline);
final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> evaluator =
factory.forApplication(application, null);
- final UnboundedSource.UnboundedReader<String> reader = source.createReader(options, null);
+ final UnboundedSource.UnboundedReader<String> reader =
+ source.createReader(p.getOptions(), null);
final UnboundedSourceShard<String, TestCheckpointMark> shard =
UnboundedSourceShard.of(source, new NeverDeduplicator(), reader, null);
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 6515e22..a18b339 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -51,10 +51,10 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,21 +80,26 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
-/** Tests for {@link WatermarkManager}. */
+/**
+ * Tests for {@link WatermarkManager}.
+ *
+ * <p>Note that the tests below output multiple elements from impulse for the purpose of watermark
+ * tracking while impulse would normally only output a single empty byte array in the global window.
+ */
@RunWith(JUnit4.class)
public class WatermarkManagerTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
private transient MockClock clock;
- private transient PCollection<Integer> createdInts;
+ private transient PCollection<byte[]> impulse;
- private transient PCollection<Integer> filtered;
- private transient PCollection<Integer> filteredTimesTwo;
- private transient PCollection<KV<String, Integer>> keyed;
+ private transient PCollection<byte[]> filtered;
+ private transient PCollection<byte[]> filteredNotEmpty;
+ private transient PCollection<KV<String, byte[]>> keyed;
- private transient PCollection<Integer> intsToFlatten;
- private transient PCollection<Integer> flattened;
+ private transient PCollection<byte[]> impulseToFlatten;
+ private transient PCollection<byte[]> flattened;
private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> manager;
private transient BundleFactory bundleFactory;
@@ -106,24 +111,25 @@ public class WatermarkManagerTest implements Serializable {
@Before
public void setup() {
- createdInts = p.apply("createdInts", Create.of(1, 2, 3));
-
- filtered = createdInts.apply("filtered", Filter.greaterThan(1));
- filteredTimesTwo =
+ impulse = p.apply(Impulse.create());
+ filtered = impulse.apply("filtered", Filter.by((element) -> element.length == 0));
+ filteredNotEmpty =
filtered.apply(
"timesTwo",
ParDo.of(
- new DoFn<Integer, Integer>() {
+ new DoFn<byte[], byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- c.output(c.element() * 2);
+ if (c.element().length > 0) {
+ c.output(c.element());
+ }
}
}));
- keyed = createdInts.apply("keyed", WithKeys.of("MyKey"));
+ keyed = impulse.apply("keyed", WithKeys.of("MyKey"));
- intsToFlatten = p.apply("intsToFlatten", Create.of(-1, 256, 65535));
- PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
+ impulseToFlatten = p.apply("impulseToFlatten", Impulse.create());
+ PCollectionList<byte[]> preFlatten = PCollectionList.of(impulse).and(impulseToFlatten);
flattened = preFlatten.apply("flattened", Flatten.pCollections());
clock = MockClock.fromInstant(new Instant(1000));
@@ -140,7 +146,7 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkForUntouchedTransform() {
- TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -152,17 +158,16 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkForUpdatedSourceTransform() {
- CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
+ CommittedBundle<byte[]> output = multiWindowedBundle(impulse, new byte[1]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.singleton(output),
new Instant(8000L));
manager.refreshAll();
- TransformWatermarks updatedSourceWatermark =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks updatedSourceWatermark = manager.getWatermarks(graph.getProducer(impulse));
assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
}
@@ -173,20 +178,20 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkForMultiInputTransform() {
- CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
+ CommittedBundle<byte[]> secondPcollectionBundle =
+ multiWindowedBundle(impulseToFlatten, new byte[1]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(intsToFlatten),
+ graph.getProducer(impulseToFlatten),
null,
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
// We didn't do anything for the first source, so we shouldn't have progressed the watermark
- TransformWatermarks firstSourceWatermark =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks firstSourceWatermark = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
firstSourceWatermark.getOutputWatermark(),
not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
@@ -194,7 +199,7 @@ public class WatermarkManagerTest implements Serializable {
// the Second Source output all of the elements so it should be done (with a watermark at the
// end of time).
TransformWatermarks secondSourceWatermark =
- manager.getWatermarks(graph.getProducer(intsToFlatten));
+ manager.getWatermarks(graph.getProducer(impulseToFlatten));
assertThat(
secondSourceWatermark.getOutputWatermark(),
not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -208,7 +213,8 @@ public class WatermarkManagerTest implements Serializable {
transformWatermark.getOutputWatermark(),
not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
- CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1);
+ CommittedBundle<byte[]> flattenedBundleSecondCreate =
+ multiWindowedBundle(flattened, new byte[1]);
// We have finished processing the bundle from the second PCollection, but we haven't consumed
// anything from the first PCollection yet; so our watermark shouldn't advance
manager.updateWatermarks(
@@ -236,20 +242,19 @@ public class WatermarkManagerTest implements Serializable {
not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
Instant firstCollectionTimestamp = new Instant(10000);
- CommittedBundle<Integer> firstPcollectionBundle =
- timestampedBundle(createdInts, TimestampedValue.of(5, firstCollectionTimestamp));
+ CommittedBundle<byte[]> firstPcollectionBundle =
+ timestampedBundle(impulse, TimestampedValue.of(new byte[5], firstCollectionTimestamp));
// the source is done, but elements are still buffered. The source output watermark should be
// past the end of the global window
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks firstSourceWatermarks =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks firstSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
firstSourceWatermarks.getOutputWatermark(),
not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -298,9 +303,9 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkMultiIdenticalInput() {
- PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
- PCollection<Integer> multiConsumer =
- PCollectionList.of(created).and(created).apply(Flatten.pCollections());
+ PCollection<byte[]> impulse = p.apply(Impulse.create());
+ PCollection<byte[]> multiConsumer =
+ PCollectionList.of(impulse).and(impulse).apply(Flatten.pCollections());
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
p.traverseTopologically(graphVisitor);
DirectGraph graph = graphVisitor.getGraph();
@@ -314,21 +319,21 @@ public class WatermarkManagerTest implements Serializable {
.<Void>createRootBundle()
.add(WindowedValue.valueInGlobalWindow(null))
.commit(clock.now());
- CommittedBundle<Integer> createBundle =
+ CommittedBundle<byte[]> createBundle =
bundleFactory
- .createBundle(created)
- .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536)))
+ .createBundle(impulse)
+ .add(WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(33536)))
.commit(clock.now());
Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
- .put(graph.getProducer(created), Collections.singleton(root))
+ .put(graph.getProducer(impulse), Collections.singleton(root))
.build();
tstMgr.initialize((Map) initialInputs);
tstMgr.updateWatermarks(
root,
TimerUpdate.empty(),
- graph.getProducer(created),
+ graph.getProducer(impulse),
null,
Collections.singleton(createBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -365,37 +370,36 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarkForMultiConsumedCollection() {
- CommittedBundle<Integer> createdBundle =
+ CommittedBundle<byte[]> impulseBundle =
timestampedBundle(
- createdInts,
- TimestampedValue.of(1, new Instant(1_000_000L)),
- TimestampedValue.of(2, new Instant(1234L)),
- TimestampedValue.of(3, new Instant(-1000L)));
+ impulse,
+ TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+ TimestampedValue.of(new byte[2], new Instant(1234L)),
+ TimestampedValue.of(new byte[3], new Instant(-1000L)));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(Long.MAX_VALUE));
manager.refreshAll();
- TransformWatermarks createdAfterProducing =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks impulseAfterProducing = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
- createdAfterProducing.getOutputWatermark(),
+ impulseAfterProducing.getOutputWatermark(),
not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
- CommittedBundle<KV<String, Integer>> keyBundle =
+ CommittedBundle<KV<String, byte[]>> keyBundle =
timestampedBundle(
keyed,
- TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
- TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
- TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+ TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -409,13 +413,13 @@ public class WatermarkManagerTest implements Serializable {
assertThat(filteredWatermarks.getInputWatermark(), not(greaterThan(new Instant(-1000L))));
assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
- CommittedBundle<Integer> filteredBundle =
- timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
+ CommittedBundle<byte[]> filteredBundle =
+ timestampedBundle(filtered, TimestampedValue.of(new byte[2], new Instant(1234L)));
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -435,31 +439,31 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void updateWatermarkWithWatermarkHolds() {
- CommittedBundle<Integer> createdBundle =
+ CommittedBundle<byte[]> impulseBundle =
timestampedBundle(
- createdInts,
- TimestampedValue.of(1, new Instant(1_000_000L)),
- TimestampedValue.of(2, new Instant(1234L)),
- TimestampedValue.of(3, new Instant(-1000L)));
+ impulse,
+ TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+ TimestampedValue.of(new byte[2], new Instant(1234L)),
+ TimestampedValue.of(new byte[3], new Instant(-1000L)));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(Long.MAX_VALUE));
- CommittedBundle<KV<String, Integer>> keyBundle =
+ CommittedBundle<KV<String, byte[]>> keyBundle =
timestampedBundle(
keyed,
- TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
- TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
- TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+ TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle),
new Instant(500L));
manager.refreshAll();
@@ -475,23 +479,23 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void updateWatermarkWithKeyedWatermarkHolds() {
- CommittedBundle<Integer> firstKeyBundle =
+ CommittedBundle<byte[]> firstKeyBundle =
bundleFactory
- .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts)
- .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
- .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
+ .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), impulse)
+ .add(WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(1_000_000L)))
+ .add(WindowedValue.timestampedValueInGlobalWindow(new byte[3], new Instant(-1000L)))
.commit(clock.now());
- CommittedBundle<Integer> secondKeyBundle =
+ CommittedBundle<byte[]> secondKeyBundle =
bundleFactory
- .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), createdInts)
- .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
+ .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), impulse)
+ .add(WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(1234L)))
.commit(clock.now());
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
ImmutableList.of(firstKeyBundle, secondKeyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -517,9 +521,9 @@ public class WatermarkManagerTest implements Serializable {
filteredWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
- CommittedBundle<Integer> fauxFirstKeyTimerBundle =
+ CommittedBundle<byte[]> fauxFirstKeyTimerBundle =
bundleFactory
- .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts)
+ .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), impulse)
.commit(clock.now());
manager.updateWatermarks(
fauxFirstKeyTimerBundle,
@@ -532,9 +536,9 @@ public class WatermarkManagerTest implements Serializable {
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
- CommittedBundle<Integer> fauxSecondKeyTimerBundle =
+ CommittedBundle<byte[]> fauxSecondKeyTimerBundle =
bundleFactory
- .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), createdInts)
+ .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), impulse)
.commit(clock.now());
manager.updateWatermarks(
fauxSecondKeyTimerBundle,
@@ -565,29 +569,29 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void updateOutputWatermarkShouldBeMonotonic() {
CommittedBundle<?> firstInput =
- bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory.createBundle(impulse).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(firstInput),
new Instant(0L));
manager.refreshAll();
- TransformWatermarks firstWatermarks = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks firstWatermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
CommittedBundle<?> secondInput =
- bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ bundleFactory.createBundle(impulse).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(secondInput),
new Instant(-250L));
manager.refreshAll();
- TransformWatermarks secondWatermarks = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks secondWatermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(secondWatermarks.getOutputWatermark(), not(lessThan(new Instant(0L))));
}
@@ -597,31 +601,31 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void updateWatermarkWithHoldsShouldBeMonotonic() {
- CommittedBundle<Integer> createdBundle =
+ CommittedBundle<byte[]> impulseBundle =
timestampedBundle(
- createdInts,
- TimestampedValue.of(1, new Instant(1_000_000L)),
- TimestampedValue.of(2, new Instant(1234L)),
- TimestampedValue.of(3, new Instant(-1000L)));
+ impulse,
+ TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+ TimestampedValue.of(new byte[2], new Instant(1234L)),
+ TimestampedValue.of(new byte[3], new Instant(-1000L)));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(Long.MAX_VALUE));
- CommittedBundle<KV<String, Integer>> keyBundle =
+ CommittedBundle<KV<String, byte[]>> keyBundle =
timestampedBundle(
keyed,
- TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
- TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
- TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+ TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+ TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle),
new Instant(500L));
manager.refreshAll();
@@ -641,35 +645,31 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithUnprocessedElements() {
- WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
- WindowedValue<Integer> second =
- WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
- WindowedValue<Integer> third =
- WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
- CommittedBundle<Integer> createdBundle =
- bundleFactory
- .createBundle(createdInts)
- .add(first)
- .add(second)
- .add(third)
- .commit(clock.now());
+ WindowedValue<byte[]> first = WindowedValue.valueInGlobalWindow(new byte[1]);
+ WindowedValue<byte[]> second =
+ WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(-1000L));
+ WindowedValue<byte[]> third =
+ WindowedValue.timestampedValueInGlobalWindow(new byte[3], new Instant(1234L));
+ CommittedBundle<byte[]> impulseBundle =
+ bundleFactory.createBundle(impulse).add(first).add(second).add(third).commit(clock.now());
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
- CommittedBundle<KV<String, Integer>> keyBundle =
+ CommittedBundle<KV<String, byte[]>> keyBundle =
timestampedBundle(
- keyed, TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
+ keyed,
+ TimestampedValue.of(KV.of("MyKey", new byte[1]), BoundedWindow.TIMESTAMP_MIN_VALUE));
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(ImmutableList.of(second, third)),
+ impulseBundle.withElements(ImmutableList.of(second, third)),
Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks keyedWatermarks = manager.getWatermarks(graph.getProducer(keyed));
@@ -679,28 +679,29 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithCompletedElementsNotPending() {
- WindowedValue<Integer> first = WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22));
- CommittedBundle<Integer> createdBundle =
- bundleFactory.createBundle(createdInts).add(first).commit(clock.now());
+ WindowedValue<byte[]> first =
+ WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(22));
+ CommittedBundle<byte[]> impulseBundle =
+ bundleFactory.createBundle(impulse).add(first).commit(clock.now());
- WindowedValue<Integer> second =
- WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22));
- CommittedBundle<Integer> neverCreatedBundle =
- bundleFactory.createBundle(createdInts).add(second).commit(clock.now());
+ WindowedValue<byte[]> second =
+ WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(22));
+ CommittedBundle<byte[]> neverImpulseBundle =
+ bundleFactory.createBundle(impulse).add(second).commit(clock.now());
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(
- neverCreatedBundle,
+ neverImpulseBundle,
TimerUpdate.empty(),
graph.getProducer(filtered),
- neverCreatedBundle.withElements(Collections.emptyList()),
+ neverImpulseBundle.withElements(Collections.emptyList()),
Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -713,32 +714,32 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void updateWatermarkWithLateData() {
Instant sourceWatermark = new Instant(1_000_000L);
- CommittedBundle<Integer> createdBundle =
+ CommittedBundle<byte[]> impulseBundle =
timestampedBundle(
- createdInts,
- TimestampedValue.of(1, sourceWatermark),
- TimestampedValue.of(2, new Instant(1234L)));
+ impulse,
+ TimestampedValue.of(new byte[1], sourceWatermark),
+ TimestampedValue.of(new byte[2], new Instant(1234L)));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
sourceWatermark);
- CommittedBundle<KV<String, Integer>> keyBundle =
+ CommittedBundle<KV<String, byte[]>> keyBundle =
timestampedBundle(
keyed,
- TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
- TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
+ TimestampedValue.of(KV.of("MyKey", new byte[1]), sourceWatermark),
+ TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)));
// Finish processing the on-time data. The watermarks should progress to be equal to the source
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -746,19 +747,19 @@ public class WatermarkManagerTest implements Serializable {
assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
- CommittedBundle<Integer> lateDataBundle =
- timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
+ CommittedBundle<byte[]> lateDataBundle =
+ timestampedBundle(impulse, TimestampedValue.of(new byte[3], new Instant(-1000L)));
// the late data arrives in a downstream PCollection after its watermark has advanced past it;
// we don't advance the watermark past the current watermark until we've consumed the late data
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
- createdBundle.withElements(Collections.emptyList()),
+ graph.getProducer(impulse),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(lateDataBundle),
new Instant(2_000_000L));
manager.refreshAll();
- TransformWatermarks bufferedLateWm = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks bufferedLateWm = manager.getWatermarks(graph.getProducer(impulse));
assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
// The input watermark should be held to its previous value (not advanced due to late data; not
@@ -767,8 +768,9 @@ public class WatermarkManagerTest implements Serializable {
assertThat(lateDataBufferedWatermark.getInputWatermark(), not(lessThan(sourceWatermark)));
assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(lessThan(sourceWatermark)));
- CommittedBundle<KV<String, Integer>> lateKeyedBundle =
- timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+ CommittedBundle<KV<String, byte[]>> lateKeyedBundle =
+ timestampedBundle(
+ keyed, TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
manager.updateWatermarks(
lateDataBundle,
TimerUpdate.empty(),
@@ -785,25 +787,25 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(
bundleFactory
- .createBundle(createdInts)
- .add(WindowedValue.valueInGlobalWindow(1))
+ .createBundle(impulse)
+ .add(WindowedValue.valueInGlobalWindow(new byte[1]))
.commit(Instant.now())),
BoundedWindow.TIMESTAMP_MAX_VALUE);
- CommittedBundle<Integer> createdBundle =
+ CommittedBundle<byte[]> impulseBundle =
bundleFactory
- .createBundle(createdInts)
- .add(WindowedValue.valueInGlobalWindow(1))
+ .createBundle(impulse)
+ .add(WindowedValue.valueInGlobalWindow(new byte[1]))
.commit(Instant.now());
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(keyed),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.emptyList(),
null);
manager.refreshAll();
@@ -817,17 +819,16 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarksAfterOnlyEmptyOutput() {
- CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
+ CommittedBundle<byte[]> emptyCreateOutput = multiWindowedBundle(impulse);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks updatedSourceWatermarks =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
updatedSourceWatermarks.getOutputWatermark(),
@@ -849,21 +850,22 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getWatermarksAfterHoldAndEmptyOutput() {
- CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
+ CommittedBundle<byte[]> firstImpulseOutput =
+ multiWindowedBundle(impulse, new byte[1], new byte[2]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(firstCreateOutput),
+ Collections.<CommittedBundle<?>>singleton(firstImpulseOutput),
new Instant(12_000L));
- CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
+ CommittedBundle<byte[]> firstFilterOutput = multiWindowedBundle(filtered);
manager.updateWatermarks(
- firstCreateOutput,
+ firstImpulseOutput,
TimerUpdate.empty(),
graph.getProducer(filtered),
- firstCreateOutput.withElements(Collections.emptyList()),
+ firstImpulseOutput.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
new Instant(10_000L));
manager.refreshAll();
@@ -871,17 +873,16 @@ public class WatermarkManagerTest implements Serializable {
assertThat(firstFilterWatermarks.getInputWatermark(), not(lessThan(new Instant(12_000L))));
assertThat(firstFilterWatermarks.getOutputWatermark(), not(greaterThan(new Instant(10_000L))));
- CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
+ CommittedBundle<byte[]> emptyImpulseOutput = multiWindowedBundle(impulse);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+ Collections.<CommittedBundle<?>>singleton(emptyImpulseOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks updatedSourceWatermarks =
- manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
updatedSourceWatermarks.getOutputWatermark(),
@@ -898,7 +899,7 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
- TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
assertThat(
watermarks.getSynchronizedProcessingOutputTime(),
@@ -913,18 +914,18 @@ public class WatermarkManagerTest implements Serializable {
filteredWatermarks.getSynchronizedProcessingOutputTime(),
not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
- CommittedBundle<Integer> createOutput =
- bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
+ CommittedBundle<byte[]> createOutput =
+ bundleFactory.createBundle(impulse).commit(new Instant(1250L));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(createOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(impulse));
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -945,7 +946,7 @@ public class WatermarkManagerTest implements Serializable {
not(greaterThan(new Instant(1250L))));
CommittedBundle<?> filterOutputBundle =
- bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
+ bundleFactory.createBundle(impulseToFlatten).commit(new Instant(1250L));
manager.updateWatermarks(
createOutput,
TimerUpdate.empty(),
@@ -971,24 +972,26 @@ public class WatermarkManagerTest implements Serializable {
*/
@Test
public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
- CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
+ CommittedBundle<byte[]> impulseBundle =
+ multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[4], new byte[8]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(createdBundle),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(1248L));
manager.refreshAll();
TransformWatermarks filteredWms = manager.getWatermarks(graph.getProducer(filtered));
TransformWatermarks filteredDoubledWms =
- manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+ manager.getWatermarks(graph.getProducer(filteredNotEmpty));
Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
- CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
+ CommittedBundle<byte[]> filteredBundle =
+ multiWindowedBundle(filtered, new byte[2], new byte[8]);
TimerData pastTimer =
TimerData.of(
StateNamespaces.global(),
@@ -1003,10 +1006,10 @@ public class WatermarkManagerTest implements Serializable {
TimeDomain.PROCESSING_TIME);
TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
timers,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -1029,11 +1032,11 @@ public class WatermarkManagerTest implements Serializable {
assertThat(
filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
- CommittedBundle<Integer> filteredTimerBundle =
+ CommittedBundle<byte[]> filteredTimerBundle =
bundleFactory.createKeyedBundle(key, filtered).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
- CommittedBundle<Integer> filteredTimerResult =
+ CommittedBundle<byte[]> filteredTimerResult =
bundleFactory
- .createKeyedBundle(key, filteredTimesTwo)
+ .createKeyedBundle(key, filteredNotEmpty)
.commit(filteredWms.getSynchronizedProcessingOutputTime());
// Complete the processing time timer
manager.updateWatermarks(
@@ -1055,7 +1058,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
filteredTimerResult,
TimerUpdate.empty(),
- graph.getProducer(filteredTimesTwo),
+ graph.getProducer(filteredNotEmpty),
filteredTimerResult.withElements(Collections.emptyList()),
Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1075,7 +1078,7 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
Instant startTime = clock.now();
- TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
TransformWatermarks filteredWatermarks = manager.getWatermarks(graph.getProducer(filtered));
@@ -1087,29 +1090,29 @@ public class WatermarkManagerTest implements Serializable {
filteredWatermarks.getSynchronizedProcessingOutputTime(),
not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
- CommittedBundle<Integer> createOutput =
- bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
+ CommittedBundle<byte[]> createOutput =
+ bundleFactory.createBundle(impulse).commit(new Instant(1250L));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(createOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
+ TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(impulse));
assertThat(
createAfterUpdate.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
assertThat(
createAfterUpdate.getSynchronizedProcessingOutputTime(), not(greaterThan(clock.now())));
- CommittedBundle<Integer> createSecondOutput =
- bundleFactory.createBundle(createdInts).commit(new Instant(750L));
+ CommittedBundle<byte[]> createSecondOutput =
+ bundleFactory.createBundle(impulse).commit(new Instant(750L));
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.<CommittedBundle<?>>singleton(createSecondOutput),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1120,33 +1123,35 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
- CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
+ CommittedBundle<byte[]> impulseBundle =
+ multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[3]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(created),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(40_900L));
manager.refreshAll();
- CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
+ CommittedBundle<byte[]> filteredBundle =
+ multiWindowedBundle(filtered, new byte[2], new byte[4]);
Instant upstreamHold = new Instant(2048L);
TimerData upstreamProcessingTimer =
TimerData.of(
StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME);
manager.updateWatermarks(
- created,
+ impulseBundle,
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.setTimer(upstreamProcessingTimer)
.build(),
graph.getProducer(filtered),
- created.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+ TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredNotEmpty));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1157,14 +1162,15 @@ public class WatermarkManagerTest implements Serializable {
// synchronized processing time
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
- CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
+ CommittedBundle<byte[]> otherImpulse =
+ multiWindowedBundle(impulse, new byte[4], new byte[8], new byte[12]);
manager.updateWatermarks(
- otherCreated,
+ otherImpulse,
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
.build(),
graph.getProducer(filtered),
- otherCreated.withElements(Collections.emptyList()),
+ otherImpulse.withElements(Collections.emptyList()),
Collections.emptyList(),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
@@ -1174,30 +1180,31 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
- CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
+ CommittedBundle<byte[]> impulseBundle =
+ multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[3]);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.<CommittedBundle<?>>singleton(created),
+ Collections.<CommittedBundle<?>>singleton(impulseBundle),
new Instant(29_919_235L));
Instant upstreamHold = new Instant(2048L);
- CommittedBundle<Integer> filteredBundle =
+ CommittedBundle<byte[]> filteredBundle =
bundleFactory
.createKeyedBundle(StructuralKey.of("key", StringUtf8Coder.of()), filtered)
.commit(upstreamHold);
manager.updateWatermarks(
- created,
+ impulseBundle,
TimerUpdate.empty(),
graph.getProducer(filtered),
- created.withElements(Collections.emptyList()),
+ impulseBundle.withElements(Collections.emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
- TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+ TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredNotEmpty));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1212,13 +1219,13 @@ public class WatermarkManagerTest implements Serializable {
assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
- CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+ CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.singleton(createdBundle),
+ Collections.singleton(impulseBundle),
new Instant(1500L));
manager.refreshAll();
@@ -1246,11 +1253,11 @@ public class WatermarkManagerTest implements Serializable {
.build();
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
update,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ impulseBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1263,7 +1270,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.emptyList(),
new Instant(50_000L));
@@ -1298,13 +1305,13 @@ public class WatermarkManagerTest implements Serializable {
assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
- CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+ CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.singleton(createdBundle),
+ Collections.singleton(impulseBundle),
new Instant(1500L));
TimerData earliestTimer =
@@ -1334,11 +1341,11 @@ public class WatermarkManagerTest implements Serializable {
.build();
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
update,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ impulseBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1352,7 +1359,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.emptyList(),
new Instant(50_000L));
@@ -1386,13 +1393,13 @@ public class WatermarkManagerTest implements Serializable {
assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
- CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+ CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.singleton(createdBundle),
+ Collections.singleton(impulseBundle),
new Instant(1500L));
TimerData earliestTimer =
@@ -1422,11 +1429,11 @@ public class WatermarkManagerTest implements Serializable {
.build();
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
update,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ impulseBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1440,7 +1447,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.emptyList(),
new Instant(50_000L));
@@ -1495,7 +1502,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
initialUpdate,
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.emptyList(),
new Instant(5000L));
@@ -1505,7 +1512,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
overridingUpdate,
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
Collections.emptyList(),
new Instant(10000L));
@@ -1547,23 +1554,23 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
- CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+ CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
initialUpdate,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ impulseBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
new Instant(1000L));
manager.refreshAll();
// This update should override the previous timer.
manager.updateWatermarks(
- createdBundle,
+ impulseBundle,
overridingUpdate,
graph.getProducer(filtered),
- createdBundle.withElements(Collections.emptyList()),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+ impulseBundle.withElements(Collections.emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
new Instant(1000L));
manager.refreshAll();
@@ -1571,9 +1578,9 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- graph.getProducer(createdInts),
+ graph.getProducer(impulse),
null,
- Collections.singleton(createdBundle),
+ Collections.singleton(impulseBundle),
new Instant(3000L));
manager.refreshAll();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 9cb2e28..e3c4f96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -134,9 +136,7 @@ public class Read {
public final PCollection<T> expand(PBegin input) {
source.validate();
- if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && !ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
+ if (useSdf(input.getPipeline().getOptions())) {
// We don't use Create here since Create is defined as a BoundedSource and using it would
// cause an infinite expansion loop. We can reconsider this if Create is implemented
// directly as a SplittableDoFn.
@@ -209,9 +209,7 @@ public class Read {
public final PCollection<T> expand(PBegin input) {
source.validate();
- if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
- && !ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
+ if (useSdf(input.getPipeline().getOptions())) {
// We don't use Create here since Create is defined as a BoundedSource and using it would
// cause an infinite expansion loop. We can reconsider this if Create is implemented
// directly as a SplittableDoFn.
@@ -924,4 +922,31 @@ public class Read {
}
private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
+
+ /**
+ * Used to migrate runners to use splittable DoFn without needing to rely on PTransform
+ * replacement which allows removal of the migration code without changing the pipeline shape
+ * since pipeline shape affects pipeline update for some runners.
+ */
+ private static final Set<String> SPLITTABLE_DOFN_PREFERRED_RUNNERS =
+ ImmutableSet.of("DirectRunner");
+
+ private static boolean useSdf(PipelineOptions options) {
+ // TODO(BEAM-10670): Make this by default true and have runners opt-out instead.
+ boolean runnerPrefersSdf = false;
+ try {
+ runnerPrefersSdf =
+ SPLITTABLE_DOFN_PREFERRED_RUNNERS.contains(options.getRunner().getSimpleName());
+ } catch (Exception e) {
+ // Ignore construction failures since there may not be a runner on the classpath if this is a
+ // test.
+ }
+
+ // We keep the old names of experiments around for portable runners and existing users.
+ return (runnerPrefersSdf
+ || ExperimentalOptions.hasExperiment(options, "beam_fn_api")
+ || ExperimentalOptions.hasExperiment(options, "use_sdf_read"))
+ && !(ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")
+ || ExperimentalOptions.hasExperiment(options, "use_deprecated_read"));
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 86a703f..a00b69a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -25,12 +25,10 @@ import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
import java.io.File;
import java.io.FileInputStream;
@@ -48,7 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
-import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
@@ -66,7 +63,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -74,7 +70,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -83,7 +78,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -1494,26 +1488,5 @@ public class AvroIOTest implements Serializable {
}
// TODO: for Write only, test withSuffix,
// withShardNameTemplate and withoutSharding.
-
- @Test
- @Category(ValidatesRunner.class)
- public void testPrimitiveReadDisplayData() {
- // Read is no longer a primitive transform when using the portability framework.
- assumeFalse(
- ExperimentalOptions.hasExperiment(
- DisplayDataEvaluator.getDefaultOptions(), "beam_fn_api"));
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
- AvroIO.Read<GenericRecord> read =
- AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING))
- .withBeamSchemas(withBeamSchemas)
- .from("/foo.*");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- "AvroIO.Read should include the file pattern in its primitive transform",
- displayData,
- hasItem(hasDisplayItem("filePattern")));
- }
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 4ecfe6a..39266bd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.Compression.GZIP;
import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.core.Is.is;
@@ -209,7 +208,6 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void testReadInvalidRecord() throws Exception {
- expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Not a valid TFRecord. Fewer than 12 bytes.");
runTestRead("bar".getBytes(Charsets.UTF_8), new String[0]);
}
@@ -217,7 +215,6 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void testReadInvalidLengthMask() throws Exception {
- expectedException.expectCause(instanceOf(IOException.class));
expectedException.expectCause(hasMessage(containsString("Mismatch of length mask")));
byte[] data = BaseEncoding.base64().decode(FOO_RECORD_BASE64);
data[9] += (byte) 1;
@@ -227,7 +224,6 @@ public class TFRecordIOTest {
@Test
@Category(NeedsRunner.class)
public void testReadInvalidDataMask() throws Exception {
- expectedException.expectCause(instanceOf(IOException.class));
expectedException.expectCause(hasMessage(containsString("Mismatch of data mask")));
byte[] data = BaseEncoding.base64().decode(FOO_RECORD_BASE64);
data[16] += (byte) 1;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 14c306e..2721356 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -27,19 +27,15 @@ import static org.apache.beam.sdk.io.Compression.GZIP;
import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
import static org.apache.beam.sdk.io.Compression.ZIP;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
import java.io.File;
import java.io.FileOutputStream;
@@ -53,14 +49,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -69,12 +63,10 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -495,24 +487,6 @@ public class TextIOReadTest {
assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString()));
}
- @Test
- @Category(ValidatesRunner.class)
- public void testPrimitiveReadDisplayData() {
- // Read is no longer a primitive transform when using the portability framework.
- assumeFalse(
- ExperimentalOptions.hasExperiment(
- DisplayDataEvaluator.getDefaultOptions(), "beam_fn_api"));
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
- TextIO.Read read = TextIO.read().from("foobar");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- "TextIO.Read should include the file prefix in its primitive display data",
- displayData,
- hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
- }
-
/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 3a6ce15..1b415c5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -40,7 +39,6 @@ import java.io.Serializable;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -516,44 +513,6 @@ public class BigQueryIOReadTest implements Serializable {
}
@Test
- public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read =
- BigQueryIO.read()
- .from("project:dataset.tableId")
- .withTestServices(
- new FakeBigQueryServices()
- .withDatasetService(new FakeDatasetService())
- .withJobService(new FakeJobService()))
- .withoutValidation();
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- "BigQueryIO.Read should include the table spec in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("table")));
- }
-
- @Test
- public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read =
- BigQueryIO.read()
- .fromQuery("foobar")
- .withTestServices(
- new FakeBigQueryServices()
- .withDatasetService(new FakeDatasetService())
- .withJobService(new FakeJobService()))
- .withoutValidation();
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- "BigQueryIO.Read should include the query in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("query")));
- }
-
- @Test
public void testBigQueryIOGetName() {
assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index e39ee99..d5d3846 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -53,7 +52,6 @@ import java.io.ByteArrayOutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
@@ -81,7 +79,6 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -274,14 +271,6 @@ public class BigQueryIOStorageQueryTest {
}
@Test
- public void testEvaluatedDisplayData() throws Exception {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- TypedRead<TableRow> typedRead = getDefaultTypedRead();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
- assertThat(displayData, hasItem(hasDisplayItem("query")));
- }
-
- @Test
public void testName() {
assertEquals("BigQueryIO.TypedRead", getDefaultTypedRead().getName());
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 00ffba2..ac5e1af 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -60,7 +59,6 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumWriter;
@@ -87,7 +85,6 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -294,18 +291,6 @@ public class BigQueryIOStorageReadTest {
}
@Test
- public void testEvaluatedDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.TypedRead<TableRow> typedRead =
- BigQueryIO.read(new TableRowParser())
- .withCoder(TableRowJsonCoder.of())
- .withMethod(Method.DIRECT_READ)
- .from("foo.com:project:dataset.table");
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
- assertThat(displayData, hasItem(hasDisplayItem("table")));
- }
-
- @Test
public void testName() {
assertEquals(
"BigQueryIO.TypedRead",
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 02b21c5..8a7d4bf 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -67,7 +67,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -101,7 +100,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -1121,34 +1119,6 @@ public class BigtableIOTest {
}
@Test
- public void testReadingPrimitiveDisplayData() throws IOException, InterruptedException {
- final String table = "fooTable";
- service.createTable(table);
-
- RowFilter rowFilter =
- RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")).build();
-
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigtableIO.Read read =
- BigtableIO.read()
- .withBigtableOptions(BIGTABLE_OPTIONS)
- .withTableId(table)
- .withRowFilter(rowFilter)
- .withBigtableService(service);
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- "BigtableIO.Read should include the table id in its primitive display data",
- displayData,
- Matchers.hasItem(hasDisplayItem("tableId")));
- assertThat(
- "BigtableIO.Read should include the row filter, if it exists, in its primitive "
- + "display data",
- displayData,
- Matchers.hasItem(hasDisplayItem("rowFilter")));
- }
-
- @Test
public void testReadingDisplayDataFromRuntimeParameters() {
ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class);
BigtableIO.Read read =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index 90dfbf8..0b3b40b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.expansion.service.ExpansionService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
@@ -131,7 +132,12 @@ public class PubsubIOExternalTest {
.withFieldValue("id_label", idAttribute)
.build());
- Pipeline p = Pipeline.create();
+ // Requirements are not passed as part of the expansion service so the validation
+ // fails because of how we construct the pipeline to expand the transform since it now
+ // has a transform with a requirement.
+ Pipeline p =
+ Pipeline.create(
+ PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create());
p.apply("unbounded", Create.of(1, 2, 3)).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);