You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/08/15 20:15:05 UTC

[beam] branch master updated: [BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e7c4ca  [BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.
     new c720f2a  Merge pull request #12519 from lukecwik/beam10670
2e7c4ca is described below

commit 2e7c4ca207074ee98568be9469cadd141ad9cc6d
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Aug 14 13:48:47 2020 -0700

    [BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.
---
 CHANGES.md                                         |  28 ++
 .../runners/core/construction/SplittableParDo.java |  47 ++
 .../core/construction/SplittableParDoTest.java     |  81 ++++
 .../apache/beam/runners/direct/DirectRunner.java   |   1 +
 .../direct/ImmutabilityEnforcementFactory.java     |  58 +++
 .../direct/BoundedReadEvaluatorFactoryTest.java    |  10 +-
 .../runners/direct/DirectGraphVisitorTest.java     |  27 +-
 .../beam/runners/direct/DirectRunnerTest.java      |   1 -
 .../beam/runners/direct/EvaluationContextTest.java |  37 +-
 .../direct/UnboundedReadEvaluatorFactoryTest.java  |  34 +-
 .../beam/runners/direct/WatermarkManagerTest.java  | 505 +++++++++++----------
 .../src/main/java/org/apache/beam/sdk/io/Read.java |  37 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java    |  27 --
 .../org/apache/beam/sdk/io/TFRecordIOTest.java     |   4 -
 .../org/apache/beam/sdk/io/TextIOReadTest.java     |  26 --
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    |  41 --
 .../gcp/bigquery/BigQueryIOStorageQueryTest.java   |  11 -
 .../io/gcp/bigquery/BigQueryIOStorageReadTest.java |  15 -
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   |  30 --
 .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java    |   8 +-
 20 files changed, 565 insertions(+), 463 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a4882b3..f310621 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -47,6 +47,34 @@
 * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
 
+# [2.25.0] - Unreleased
+
+## Highlights
+
+* Splittable DoFn is opt-out for Java based runners (DirectRunner) using `--experiments=use_deprecated_read`. For all other runners, users can opt-in using `--experiments=use_sdf_read`. (Java) ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10135))
+* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+
+## I/Os
+
+* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## New Features / Improvements
+
+* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Breaking Changes
+
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Deprecations
+
+* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
+## Known Issues
+
+* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+
 # [2.24.0] - Unreleased
 
 ## Highlights
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 3187112..acece88 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,11 +36,16 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.Transform
 import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
 import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
 import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -630,4 +636,45 @@ public class SplittableParDo<InputT, OutputT, RestrictionT, WatermarkEstimatorSt
       invoker = null;
     }
   }
+
+  /**
+   * Throws an {@link IllegalArgumentException} if the pipeline contains any primitive read
+   * transforms that have not been expanded to be executed as {@link DoFn splittable DoFns} as long
+   * as the experiment {@code use_deprecated_read} is not specified.
+   */
+  public static void validateNoPrimitiveReads(Pipeline pipeline) {
+    // TODO(BEAM-10670): Remove the deprecated Read and make the splittable DoFn the only option.
+    if (!(ExperimentalOptions.hasExperiment(
+            pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
+        || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read"))) {
+
+      pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+    }
+  }
+
+  /**
+   * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that the pipeline does not
+   * contain any primitive reads.
+   */
+  private static class ValidateNoPrimitiveReads extends PipelineVisitor.Defaults {
+    public final List<PTransform<?, ?>> foundPrimitiveReads = new ArrayList<>();
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      if (node.getTransform() instanceof Read.Bounded
+          || node.getTransform() instanceof Read.Unbounded) {
+        foundPrimitiveReads.add(node.getTransform());
+      }
+    }
+
+    @Override
+    public void leavePipeline(Pipeline pipeline) {
+      if (!foundPrimitiveReads.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Found primitive read transforms %s within the pipeline when only Splittable DoFns were expected. If you would like to use the deprecated behavior, please specify the experiment 'use_deprecated_read'. For example '--experiements=use_deprecated_read' on the command line.",
+                foundPrimitiveReads));
+      }
+    }
+  }
 }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 55c2dbf..65715ac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -19,10 +19,23 @@ package org.apache.beam.runners.core.construction;
 
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -167,4 +180,72 @@ public class SplittableParDoTest {
                 "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
             .isBounded());
   }
+
+  private static class FakeBoundedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return Collections.singletonList(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Coder<String> getOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+
+  @Test
+  public void testValidateNoPrimitiveReadsIsSkippedWhenUsingDeprecatedRead() {
+    PipelineOptions deprecatedReadOptions = PipelineOptionsFactory.create();
+    deprecatedReadOptions.setRunner(CrashingRunner.class);
+    ExperimentalOptions.addExperiment(
+        deprecatedReadOptions.as(ExperimentalOptions.class), "use_deprecated_read");
+
+    Pipeline deprecatedReadAllowed = Pipeline.create(deprecatedReadOptions);
+    deprecatedReadAllowed.apply(Read.from(new FakeBoundedSource()));
+    deprecatedReadAllowed.apply(
+        Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+    // We expect that the experiment will skip validation.
+    SplittableParDo.validateNoPrimitiveReads(deprecatedReadAllowed);
+  }
+
+  @Test
+  public void testValidateNoPrimitiveReadsWhenThereAreNone() {
+    PipelineOptions sdfOptions = PipelineOptionsFactory.create();
+    sdfOptions.setRunner(CrashingRunner.class);
+    ExperimentalOptions.addExperiment(sdfOptions.as(ExperimentalOptions.class), "beam_fn_api");
+    Pipeline sdf = Pipeline.create(sdfOptions);
+    sdf.apply(Read.from(new FakeBoundedSource()));
+    sdf.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+    // We expect that the experiment will have caused the transform to use SDF wrappers during
+    // execution.
+    SplittableParDo.validateNoPrimitiveReads(sdf);
+  }
+
+  @Test
+  public void testValidateNoPrimitiveReadsFindsPrimitiveReads() {
+    PipelineOptions noSdfOptions = PipelineOptionsFactory.create();
+    noSdfOptions.setRunner(CrashingRunner.class);
+    Pipeline boundedRead = Pipeline.create(noSdfOptions);
+    boundedRead.apply(Read.from(new FakeBoundedSource()));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> SplittableParDo.validateNoPrimitiveReads(boundedRead));
+
+    Pipeline unboundedRead = Pipeline.create(noSdfOptions);
+    unboundedRead.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource())));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> SplittableParDo.validateNoPrimitiveReads(unboundedRead));
+  }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f7c74c0..d980588 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -179,6 +179,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
       DisplayDataValidator.validatePipeline(pipeline);
       DisplayDataValidator.validateOptions(options);
+      SplittableParDo.validateNoPrimitiveReads(pipeline);
 
       ExecutorService metricsPool =
           Executors.newCachedThreadPool(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index f5f0242..144318b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -19,9 +19,13 @@ package org.apache.beam.runners.direct;
 
 import java.util.IdentityHashMap;
 import java.util.Map;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.MutationDetector;
 import org.apache.beam.sdk.util.MutationDetectors;
@@ -40,9 +44,63 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
   @Override
   public <T> ModelEnforcement<T> forBundle(
       CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    if (isReadTransform(consumer)) {
+      return NoopReadEnforcement.INSTANCE;
+    }
     return new ImmutabilityCheckingEnforcement<>(input, consumer);
   }
 
+  private static boolean isReadTransform(AppliedPTransform<?, ?, ?> consumer) {
+    IsReadVisitor visitor = new IsReadVisitor(consumer.getTransform());
+    consumer.getPipeline().traverseTopologically(visitor);
+    return visitor.isRead();
+  }
+
+  private static class IsReadVisitor extends PipelineVisitor.Defaults {
+    private final PTransform<?, ?> transform;
+    private boolean isRead;
+    private boolean isInsideRead;
+
+    private IsReadVisitor(PTransform<?, ?> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(Node node) {
+      if (node.getTransform() instanceof Read.Bounded
+          || node.getTransform() instanceof Read.Unbounded) {
+        isInsideRead = true;
+      }
+      if (isInsideRead && node.getTransform() == transform) {
+        isRead = true;
+      }
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    @Override
+    public void leaveCompositeTransform(Node node) {
+      if (node.getTransform() instanceof Read.Bounded
+          || node.getTransform() instanceof Read.Unbounded) {
+        isInsideRead = false;
+      }
+    }
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      if (isInsideRead && node.getTransform() == transform) {
+        isRead = true;
+      }
+    }
+
+    private boolean isRead() {
+      return isRead;
+    }
+  }
+
+  private static class NoopReadEnforcement<T> extends AbstractModelEnforcement<T> {
+    private static final NoopReadEnforcement INSTANCE = new NoopReadEnforcement<>();
+  }
+
   private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
     private final AppliedPTransform<?, ?, ?> transform;
     private final Map<WindowedValue<T>, MutationDetector> mutationElements;
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 52334cf..ccb1971 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -77,7 +77,12 @@ public class BoundedReadEvaluatorFactoryTest {
   private BundleFactory bundleFactory;
   private AppliedPTransform<?, ?, ?> longsProducer;
 
-  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule
+  public TestPipeline p =
+      TestPipeline.fromOptions(
+              PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create())
+          .enableAbandonedNodeEnforcement(false);
+
   private PipelineOptions options;
 
   @Before
@@ -86,10 +91,9 @@ public class BoundedReadEvaluatorFactoryTest {
     source = CountingSource.upTo(10L);
     longs = p.apply(Read.from(source));
 
-    options = PipelineOptionsFactory.create();
     factory =
         new BoundedReadEvaluatorFactory(
-            context, options, Long.MAX_VALUE /* minimum size for dynamic splits */);
+            context, p.getOptions(), Long.MAX_VALUE /* minimum size for dynamic splits */);
     bundleFactory = ImmutableListBundleFactory.create();
     longsProducer = DirectGraphs.getProducer(longs);
   }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index c7536aa..b89e00a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -25,18 +25,17 @@ import static org.junit.Assert.assertThat;
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.PCollections;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -86,24 +85,19 @@ public class DirectGraphVisitorTest implements Serializable {
 
   @Test
   public void getRootTransformsContainsRootTransforms() {
-    PCollection<String> created = p.apply(Create.of("foo", "bar"));
-    PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
-    PCollection<Long> unCounted = p.apply(GenerateSequence.from(0));
+    PCollection<byte[]> impulse = p.apply(Impulse.create());
+    impulse.apply(WithKeys.of("abc"));
     p.traverseTopologically(visitor);
     DirectGraph graph = visitor.getGraph();
-    assertThat(graph.getRootTransforms(), hasSize(3));
+    assertThat(graph.getRootTransforms(), hasSize(1));
     assertThat(
         graph.getRootTransforms(),
-        Matchers.containsInAnyOrder(
-            new Object[] {
-              graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)
-            }));
+        Matchers.containsInAnyOrder(new Object[] {graph.getProducer(impulse)}));
     for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
       // Root transforms will have no inputs
       assertThat(root.getInputs().entrySet(), emptyIterable());
       assertThat(
-          Iterables.getOnlyElement(root.getOutputs().values()),
-          Matchers.<POutput>isOneOf(created, counted, unCounted));
+          Iterables.getOnlyElement(root.getOutputs().values()), Matchers.<POutput>isOneOf(impulse));
     }
   }
 
@@ -198,8 +192,11 @@ public class DirectGraphVisitorTest implements Serializable {
 
     p.traverseTopologically(visitor);
     DirectGraph graph = visitor.getGraph();
-    assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0"));
-    assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1"));
+    // Step names are of the format "s#" such as "s0", "s1", ...
+    int createdStepIndex =
+        Integer.parseInt(graph.getStepName(graph.getProducer(created)).substring(1));
+    assertThat(
+        graph.getStepName(graph.getProducer(transformed)), equalTo("s" + (createdStepIndex + 1)));
     // finished doesn't have a producer, because it's not a PValue.
     // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to
     // use, or make them so.
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index fbcf0c0..b493d2f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -601,7 +601,6 @@ public class DirectRunnerTest implements Serializable {
     Pipeline p = getPipeline();
     p.apply(GenerateSequence.from(0)).setCoder(new LongNoDecodeCoder());
 
-    thrown.expectCause(isA(CoderException.class));
     thrown.expectMessage("Cannot decode a long");
     p.run();
   }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index aa1dd41..3e25b97 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -46,13 +46,12 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -79,10 +78,10 @@ import org.junit.runners.JUnit4;
 public class EvaluationContextTest {
   private EvaluationContext context;
 
-  private PCollection<Integer> created;
-  private PCollection<KV<String, Integer>> downstream;
-  private PCollectionView<Iterable<Integer>> view;
-  private PCollection<Long> unbounded;
+  private PCollection<byte[]> impulse;
+  private PCollection<KV<String, byte[]>> downstream;
+  private PCollectionView<Iterable<byte[]>> view;
+  private PCollection<byte[]> unbounded;
 
   private DirectGraph graph;
 
@@ -97,10 +96,10 @@ public class EvaluationContextTest {
   public void setup() {
     DirectRunner runner = DirectRunner.fromOptions(PipelineOptionsFactory.create());
 
-    created = p.apply(Create.of(1, 2, 3));
-    downstream = created.apply(WithKeys.of("foo"));
-    view = created.apply(View.asIterable());
-    unbounded = p.apply(GenerateSequence.from(0));
+    impulse = p.apply(Impulse.create());
+    downstream = impulse.apply(WithKeys.of("foo"));
+    view = impulse.apply(View.asIterable());
+    unbounded = p.apply(Impulse.create()).setIsBoundedInternal(IsBounded.UNBOUNDED);
 
     p.replaceAll(runner.defaultTransformOverrides());
 
@@ -118,7 +117,7 @@ public class EvaluationContextTest {
             keyedPValueTrackingVisitor.getKeyedPValues(),
             Executors.newSingleThreadExecutor());
 
-    createdProducer = graph.getProducer(created);
+    createdProducer = graph.getProducer(impulse);
     downstreamProducer = graph.getProducer(downstream);
     viewProducer = graph.getProducer(view);
     unboundedProducer = graph.getProducer(unbounded);
@@ -126,7 +125,7 @@ public class EvaluationContextTest {
 
   @Test
   public void writeToViewWriterThenReadReads() {
-    PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
+    PCollectionViewWriter<?, Iterable<byte[]>> viewWriter =
         context.createPCollectionViewWriter(
             PCollection.createPrimitiveOutputInternal(
                 p,
@@ -187,7 +186,7 @@ public class EvaluationContextTest {
 
     context.handleResult(
         ImmutableListBundleFactory.create()
-            .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created)
+            .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), impulse)
             .commit(Instant.now()),
         ImmutableList.of(),
         StepTransformResult.withoutHold(createdProducer)
@@ -262,7 +261,7 @@ public class EvaluationContextTest {
         StepTransformResult.withoutHold(downstreamProducer).withState(state).build();
 
     context.handleResult(
-        context.createKeyedBundle(myKey, created).commit(Instant.now()),
+        context.createKeyedBundle(myKey, impulse).commit(Instant.now()),
         ImmutableList.of(),
         stateResult);
 
@@ -330,7 +329,7 @@ public class EvaluationContextTest {
     // haven't added any timers, must be empty
     assertThat(context.extractFiredTimers(), emptyIterable());
     context.handleResult(
-        context.createKeyedBundle(key, created).commit(Instant.now()),
+        context.createKeyedBundle(key, impulse).commit(Instant.now()),
         ImmutableList.of(),
         timerResult);
 
@@ -355,7 +354,7 @@ public class EvaluationContextTest {
   @Test
   public void createKeyedBundleKeyed() {
     StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
-    CommittedBundle<KV<String, Integer>> keyedBundle =
+    CommittedBundle<KV<String, byte[]>> keyedBundle =
         context.createKeyedBundle(key, downstream).commit(Instant.now());
     assertThat(keyedBundle.getKey(), equalTo(key));
   }
@@ -374,8 +373,8 @@ public class EvaluationContextTest {
   public void isDoneWithPartiallyDone() {
     assertThat(context.isDone(), is(false));
 
-    UncommittedBundle<Integer> rootBundle = context.createBundle(created);
-    rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+    UncommittedBundle<byte[]> rootBundle = context.createBundle(impulse);
+    rootBundle.add(WindowedValue.valueInGlobalWindow(new byte[0]));
     CommittedResult handleResult =
         context.handleResult(
             null,
@@ -390,7 +389,7 @@ public class EvaluationContextTest {
         null, ImmutableList.of(), StepTransformResult.withoutHold(unboundedProducer).build());
     assertThat(context.isDone(), is(false));
 
-    for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(created)) {
+    for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(impulse)) {
       context.handleResult(
           committedBundle, ImmutableList.of(), StepTransformResult.withoutHold(consumers).build());
     }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 4b257be..4033d3b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -106,17 +106,20 @@ public class UnboundedReadEvaluatorFactoryTest {
   private DirectGraph graph;
 
   @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-  private PipelineOptions options;
+
+  @Rule
+  public TestPipeline p =
+      TestPipeline.fromOptions(
+              PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create())
+          .enableAbandonedNodeEnforcement(false);
 
   @Before
   public void setup() {
     source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
     longs = p.apply(Read.from(source));
-    options = PipelineOptionsFactory.create();
 
     context = mock(EvaluationContext.class);
-    factory = new UnboundedReadEvaluatorFactory(context, options);
+    factory = new UnboundedReadEvaluatorFactory(context, p.getOptions());
     output = bundleFactory.createBundle(longs);
     graph = DirectGraphs.getGraph(p);
     when(context.createBundle(longs)).thenReturn(output);
@@ -128,7 +131,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     int numSplits = 5;
     Collection<CommittedBundle<?>> initialInputs =
-        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+        new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
             .getInitialInputs(graph.getProducer(longs), numSplits);
     // CountingSource.unbounded has very good splitting behavior
     assertThat(initialInputs, hasSize(numSplits));
@@ -161,7 +164,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
 
     Collection<CommittedBundle<?>> initialInputs =
-        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+        new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
             .getInitialInputs(graph.getProducer(longs), 1);
 
     CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
@@ -202,7 +205,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
-        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+        new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
             .getInitialInputs(sourceTransform, 1);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
@@ -242,7 +245,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
-        new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+        new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
             .getInitialInputs(sourceTransform, 1);
 
     // Process the initial shard. This might produce some output, and will produce a residual shard
@@ -311,8 +314,8 @@ public class UnboundedReadEvaluatorFactoryTest {
             .add(shard)
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
-        new UnboundedReadEvaluatorFactory(context, options, 1.0 /* Always reuse */);
-    new UnboundedReadEvaluatorFactory.InputProvider(context, options)
+        new UnboundedReadEvaluatorFactory(context, p.getOptions(), 1.0 /* Always reuse */);
+    new UnboundedReadEvaluatorFactory.InputProvider(context, p.getOptions())
         .getInitialInputs(sourceTransform, 1);
 
     CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual = inputBundle;
@@ -356,7 +359,7 @@ public class UnboundedReadEvaluatorFactoryTest {
             .add(shard)
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
-        new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
+        new UnboundedReadEvaluatorFactory(context, p.getOptions(), 0.0 /* never reuse */);
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
@@ -402,7 +405,7 @@ public class UnboundedReadEvaluatorFactoryTest {
             .add(shard)
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
-        new UnboundedReadEvaluatorFactory(context, options, 0.0 /* never reuse */);
+        new UnboundedReadEvaluatorFactory(context, p.getOptions(), 0.0 /* never reuse */);
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     thrown.expect(IOException.class);
@@ -438,10 +441,10 @@ public class UnboundedReadEvaluatorFactoryTest {
             emptySet(),
             Executors.newCachedThreadPool());
     final UnboundedReadEvaluatorFactory factory =
-        new UnboundedReadEvaluatorFactory(context, options);
+        new UnboundedReadEvaluatorFactory(context, p.getOptions());
 
     final Read.Unbounded<String> unbounded = Read.from(source);
-    final Pipeline pipeline = Pipeline.create(options);
+    final Pipeline pipeline = Pipeline.create(p.getOptions());
     final PCollection<String> pCollection = pipeline.apply(unbounded);
     final AppliedPTransform<PBegin, PCollection<String>, Read.Unbounded<String>> application =
         AppliedPTransform.of(
@@ -452,7 +455,8 @@ public class UnboundedReadEvaluatorFactoryTest {
             pipeline);
     final TransformEvaluator<UnboundedSourceShard<String, TestCheckpointMark>> evaluator =
         factory.forApplication(application, null);
-    final UnboundedSource.UnboundedReader<String> reader = source.createReader(options, null);
+    final UnboundedSource.UnboundedReader<String> reader =
+        source.createReader(p.getOptions(), null);
     final UnboundedSourceShard<String, TestCheckpointMark> shard =
         UnboundedSourceShard.of(source, new NeverDeduplicator(), reader, null);
     final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 6515e22..a18b339 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -51,10 +51,10 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,21 +80,26 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mockito;
 
-/** Tests for {@link WatermarkManager}. */
+/**
+ * Tests for {@link WatermarkManager}.
+ *
+ * <p>Note that the tests below output multiple elements from impulse for the purpose of watermark
+ * tracking while impulse would normally only output a single empty byte array in the global window.
+ */
 @RunWith(JUnit4.class)
 public class WatermarkManagerTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   private transient MockClock clock;
 
-  private transient PCollection<Integer> createdInts;
+  private transient PCollection<byte[]> impulse;
 
-  private transient PCollection<Integer> filtered;
-  private transient PCollection<Integer> filteredTimesTwo;
-  private transient PCollection<KV<String, Integer>> keyed;
+  private transient PCollection<byte[]> filtered;
+  private transient PCollection<byte[]> filteredNotEmpty;
+  private transient PCollection<KV<String, byte[]>> keyed;
 
-  private transient PCollection<Integer> intsToFlatten;
-  private transient PCollection<Integer> flattened;
+  private transient PCollection<byte[]> impulseToFlatten;
+  private transient PCollection<byte[]> flattened;
 
   private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> manager;
   private transient BundleFactory bundleFactory;
@@ -106,24 +111,25 @@ public class WatermarkManagerTest implements Serializable {
   @Before
   public void setup() {
 
-    createdInts = p.apply("createdInts", Create.of(1, 2, 3));
-
-    filtered = createdInts.apply("filtered", Filter.greaterThan(1));
-    filteredTimesTwo =
+    impulse = p.apply(Impulse.create());
+    filtered = impulse.apply("filtered", Filter.by((element) -> element.length == 0));
+    filteredNotEmpty =
         filtered.apply(
             "timesTwo",
             ParDo.of(
-                new DoFn<Integer, Integer>() {
+                new DoFn<byte[], byte[]>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) throws Exception {
-                    c.output(c.element() * 2);
+                    if (c.element().length > 0) {
+                      c.output(c.element());
+                    }
                   }
                 }));
 
-    keyed = createdInts.apply("keyed", WithKeys.of("MyKey"));
+    keyed = impulse.apply("keyed", WithKeys.of("MyKey"));
 
-    intsToFlatten = p.apply("intsToFlatten", Create.of(-1, 256, 65535));
-    PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
+    impulseToFlatten = p.apply("impulseToFlatten", Impulse.create());
+    PCollectionList<byte[]> preFlatten = PCollectionList.of(impulse).and(impulseToFlatten);
     flattened = preFlatten.apply("flattened", Flatten.pCollections());
 
     clock = MockClock.fromInstant(new Instant(1000));
@@ -140,7 +146,7 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkForUntouchedTransform() {
-    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
 
     assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
     assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -152,17 +158,16 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkForUpdatedSourceTransform() {
-    CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
+    CommittedBundle<byte[]> output = multiWindowedBundle(impulse, new byte[1]);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.singleton(output),
         new Instant(8000L));
     manager.refreshAll();
-    TransformWatermarks updatedSourceWatermark =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks updatedSourceWatermark = manager.getWatermarks(graph.getProducer(impulse));
 
     assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
   }
@@ -173,20 +178,20 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkForMultiInputTransform() {
-    CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
+    CommittedBundle<byte[]> secondPcollectionBundle =
+        multiWindowedBundle(impulseToFlatten, new byte[1]);
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(intsToFlatten),
+        graph.getProducer(impulseToFlatten),
         null,
         Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
     // We didn't do anything for the first source, so we shouldn't have progressed the watermark
-    TransformWatermarks firstSourceWatermark =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks firstSourceWatermark = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(
         firstSourceWatermark.getOutputWatermark(),
         not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
@@ -194,7 +199,7 @@ public class WatermarkManagerTest implements Serializable {
     // the Second Source output all of the elements so it should be done (with a watermark at the
     // end of time).
     TransformWatermarks secondSourceWatermark =
-        manager.getWatermarks(graph.getProducer(intsToFlatten));
+        manager.getWatermarks(graph.getProducer(impulseToFlatten));
     assertThat(
         secondSourceWatermark.getOutputWatermark(),
         not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -208,7 +213,8 @@ public class WatermarkManagerTest implements Serializable {
         transformWatermark.getOutputWatermark(),
         not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
-    CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1);
+    CommittedBundle<byte[]> flattenedBundleSecondCreate =
+        multiWindowedBundle(flattened, new byte[1]);
     // We have finished processing the bundle from the second PCollection, but we haven't consumed
     // anything from the first PCollection yet; so our watermark shouldn't advance
     manager.updateWatermarks(
@@ -236,20 +242,19 @@ public class WatermarkManagerTest implements Serializable {
         not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
     Instant firstCollectionTimestamp = new Instant(10000);
-    CommittedBundle<Integer> firstPcollectionBundle =
-        timestampedBundle(createdInts, TimestampedValue.of(5, firstCollectionTimestamp));
+    CommittedBundle<byte[]> firstPcollectionBundle =
+        timestampedBundle(impulse, TimestampedValue.of(new byte[5], firstCollectionTimestamp));
     // the source is done, but elements are still buffered. The source output watermark should be
     // past the end of the global window
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks firstSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks firstSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(
         firstSourceWatermarks.getOutputWatermark(),
         not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -298,9 +303,9 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkMultiIdenticalInput() {
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    PCollection<Integer> multiConsumer =
-        PCollectionList.of(created).and(created).apply(Flatten.pCollections());
+    PCollection<byte[]> impulse = p.apply(Impulse.create());
+    PCollection<byte[]> multiConsumer =
+        PCollectionList.of(impulse).and(impulse).apply(Flatten.pCollections());
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
     p.traverseTopologically(graphVisitor);
     DirectGraph graph = graphVisitor.getGraph();
@@ -314,21 +319,21 @@ public class WatermarkManagerTest implements Serializable {
             .<Void>createRootBundle()
             .add(WindowedValue.valueInGlobalWindow(null))
             .commit(clock.now());
-    CommittedBundle<Integer> createBundle =
+    CommittedBundle<byte[]> createBundle =
         bundleFactory
-            .createBundle(created)
-            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536)))
+            .createBundle(impulse)
+            .add(WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(33536)))
             .commit(clock.now());
 
     Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
         ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
-            .put(graph.getProducer(created), Collections.singleton(root))
+            .put(graph.getProducer(impulse), Collections.singleton(root))
             .build();
     tstMgr.initialize((Map) initialInputs);
     tstMgr.updateWatermarks(
         root,
         TimerUpdate.empty(),
-        graph.getProducer(created),
+        graph.getProducer(impulse),
         null,
         Collections.singleton(createBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -365,37 +370,36 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarkForMultiConsumedCollection() {
-    CommittedBundle<Integer> createdBundle =
+    CommittedBundle<byte[]> impulseBundle =
         timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L)));
+            impulse,
+            TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+            TimestampedValue.of(new byte[2], new Instant(1234L)),
+            TimestampedValue.of(new byte[3], new Instant(-1000L)));
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(Long.MAX_VALUE));
     manager.refreshAll();
-    TransformWatermarks createdAfterProducing =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks impulseAfterProducing = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(
-        createdAfterProducing.getOutputWatermark(),
+        impulseAfterProducing.getOutputWatermark(),
         not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
+    CommittedBundle<KV<String, byte[]>> keyBundle =
         timestampedBundle(
             keyed,
-            TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+            TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
@@ -409,13 +413,13 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(filteredWatermarks.getInputWatermark(), not(greaterThan(new Instant(-1000L))));
     assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
 
-    CommittedBundle<Integer> filteredBundle =
-        timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
+    CommittedBundle<byte[]> filteredBundle =
+        timestampedBundle(filtered, TimestampedValue.of(new byte[2], new Instant(1234L)));
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
@@ -435,31 +439,31 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void updateWatermarkWithWatermarkHolds() {
-    CommittedBundle<Integer> createdBundle =
+    CommittedBundle<byte[]> impulseBundle =
         timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L)));
+            impulse,
+            TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+            TimestampedValue.of(new byte[2], new Instant(1234L)),
+            TimestampedValue.of(new byte[3], new Instant(-1000L)));
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(Long.MAX_VALUE));
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
+    CommittedBundle<KV<String, byte[]>> keyBundle =
         timestampedBundle(
             keyed,
-            TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+            TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(keyBundle),
         new Instant(500L));
     manager.refreshAll();
@@ -475,23 +479,23 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void updateWatermarkWithKeyedWatermarkHolds() {
-    CommittedBundle<Integer> firstKeyBundle =
+    CommittedBundle<byte[]> firstKeyBundle =
         bundleFactory
-            .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts)
-            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
-            .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
+            .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), impulse)
+            .add(WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(1_000_000L)))
+            .add(WindowedValue.timestampedValueInGlobalWindow(new byte[3], new Instant(-1000L)))
             .commit(clock.now());
 
-    CommittedBundle<Integer> secondKeyBundle =
+    CommittedBundle<byte[]> secondKeyBundle =
         bundleFactory
-            .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), createdInts)
-            .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
+            .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), impulse)
+            .add(WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(1234L)))
             .commit(clock.now());
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         ImmutableList.of(firstKeyBundle, secondKeyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -517,9 +521,9 @@ public class WatermarkManagerTest implements Serializable {
         filteredWatermarks.getInputWatermark(), not(lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(filteredWatermarks.getOutputWatermark(), not(greaterThan(new Instant(-1000L))));
 
-    CommittedBundle<Integer> fauxFirstKeyTimerBundle =
+    CommittedBundle<byte[]> fauxFirstKeyTimerBundle =
         bundleFactory
-            .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), createdInts)
+            .createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), impulse)
             .commit(clock.now());
     manager.updateWatermarks(
         fauxFirstKeyTimerBundle,
@@ -532,9 +536,9 @@ public class WatermarkManagerTest implements Serializable {
 
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
 
-    CommittedBundle<Integer> fauxSecondKeyTimerBundle =
+    CommittedBundle<byte[]> fauxSecondKeyTimerBundle =
         bundleFactory
-            .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), createdInts)
+            .createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), impulse)
             .commit(clock.now());
     manager.updateWatermarks(
         fauxSecondKeyTimerBundle,
@@ -565,29 +569,29 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void updateOutputWatermarkShouldBeMonotonic() {
     CommittedBundle<?> firstInput =
-        bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory.createBundle(impulse).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(firstInput),
         new Instant(0L));
     manager.refreshAll();
-    TransformWatermarks firstWatermarks = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks firstWatermarks = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
 
     CommittedBundle<?> secondInput =
-        bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory.createBundle(impulse).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(secondInput),
         new Instant(-250L));
     manager.refreshAll();
-    TransformWatermarks secondWatermarks = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks secondWatermarks = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(secondWatermarks.getOutputWatermark(), not(lessThan(new Instant(0L))));
   }
 
@@ -597,31 +601,31 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void updateWatermarkWithHoldsShouldBeMonotonic() {
-    CommittedBundle<Integer> createdBundle =
+    CommittedBundle<byte[]> impulseBundle =
         timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L)));
+            impulse,
+            TimestampedValue.of(new byte[1], new Instant(1_000_000L)),
+            TimestampedValue.of(new byte[2], new Instant(1234L)),
+            TimestampedValue.of(new byte[3], new Instant(-1000L)));
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(Long.MAX_VALUE));
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
+    CommittedBundle<KV<String, byte[]>> keyBundle =
         timestampedBundle(
             keyed,
-            TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+            TimestampedValue.of(KV.of("MyKey", new byte[1]), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(keyBundle),
         new Instant(500L));
     manager.refreshAll();
@@ -641,35 +645,31 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void updateWatermarkWithUnprocessedElements() {
-    WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
-    WindowedValue<Integer> second =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
-    WindowedValue<Integer> third =
-        WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
-    CommittedBundle<Integer> createdBundle =
-        bundleFactory
-            .createBundle(createdInts)
-            .add(first)
-            .add(second)
-            .add(third)
-            .commit(clock.now());
+    WindowedValue<byte[]> first = WindowedValue.valueInGlobalWindow(new byte[1]);
+    WindowedValue<byte[]> second =
+        WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(-1000L));
+    WindowedValue<byte[]> third =
+        WindowedValue.timestampedValueInGlobalWindow(new byte[3], new Instant(1234L));
+    CommittedBundle<byte[]> impulseBundle =
+        bundleFactory.createBundle(impulse).add(first).add(second).add(third).commit(clock.now());
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
+    CommittedBundle<KV<String, byte[]>> keyBundle =
         timestampedBundle(
-            keyed, TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
+            keyed,
+            TimestampedValue.of(KV.of("MyKey", new byte[1]), BoundedWindow.TIMESTAMP_MIN_VALUE));
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(ImmutableList.of(second, third)),
+        impulseBundle.withElements(ImmutableList.of(second, third)),
         Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks keyedWatermarks = manager.getWatermarks(graph.getProducer(keyed));
@@ -679,28 +679,29 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void updateWatermarkWithCompletedElementsNotPending() {
-    WindowedValue<Integer> first = WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22));
-    CommittedBundle<Integer> createdBundle =
-        bundleFactory.createBundle(createdInts).add(first).commit(clock.now());
+    WindowedValue<byte[]> first =
+        WindowedValue.timestampedValueInGlobalWindow(new byte[1], new Instant(22));
+    CommittedBundle<byte[]> impulseBundle =
+        bundleFactory.createBundle(impulse).add(first).commit(clock.now());
 
-    WindowedValue<Integer> second =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22));
-    CommittedBundle<Integer> neverCreatedBundle =
-        bundleFactory.createBundle(createdInts).add(second).commit(clock.now());
+    WindowedValue<byte[]> second =
+        WindowedValue.timestampedValueInGlobalWindow(new byte[2], new Instant(22));
+    CommittedBundle<byte[]> neverImpulseBundle =
+        bundleFactory.createBundle(impulse).add(second).commit(clock.now());
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(
-        neverCreatedBundle,
+        neverImpulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(filtered),
-        neverCreatedBundle.withElements(Collections.emptyList()),
+        neverImpulseBundle.withElements(Collections.emptyList()),
         Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -713,32 +714,32 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void updateWatermarkWithLateData() {
     Instant sourceWatermark = new Instant(1_000_000L);
-    CommittedBundle<Integer> createdBundle =
+    CommittedBundle<byte[]> impulseBundle =
         timestampedBundle(
-            createdInts,
-            TimestampedValue.of(1, sourceWatermark),
-            TimestampedValue.of(2, new Instant(1234L)));
+            impulse,
+            TimestampedValue.of(new byte[1], sourceWatermark),
+            TimestampedValue.of(new byte[2], new Instant(1234L)));
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         sourceWatermark);
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
+    CommittedBundle<KV<String, byte[]>> keyBundle =
         timestampedBundle(
             keyed,
-            TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
+            TimestampedValue.of(KV.of("MyKey", new byte[1]), sourceWatermark),
+            TimestampedValue.of(KV.of("MyKey", new byte[2]), new Instant(1234L)));
 
     // Finish processing the on-time data. The watermarks should progress to be equal to the source
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(keyBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
@@ -746,19 +747,19 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
     assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
 
-    CommittedBundle<Integer> lateDataBundle =
-        timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
+    CommittedBundle<byte[]> lateDataBundle =
+        timestampedBundle(impulse, TimestampedValue.of(new byte[3], new Instant(-1000L)));
     // the late data arrives in a downstream PCollection after its watermark has advanced past it;
     // we don't advance the watermark past the current watermark until we've consumed the late data
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
-        createdBundle.withElements(Collections.emptyList()),
+        graph.getProducer(impulse),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(lateDataBundle),
         new Instant(2_000_000L));
     manager.refreshAll();
-    TransformWatermarks bufferedLateWm = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks bufferedLateWm = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
 
     // The input watermark should be held to its previous value (not advanced due to late data; not
@@ -767,8 +768,9 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(lateDataBufferedWatermark.getInputWatermark(), not(lessThan(sourceWatermark)));
     assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(lessThan(sourceWatermark)));
 
-    CommittedBundle<KV<String, Integer>> lateKeyedBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    CommittedBundle<KV<String, byte[]>> lateKeyedBundle =
+        timestampedBundle(
+            keyed, TimestampedValue.of(KV.of("MyKey", new byte[3]), new Instant(-1000L)));
     manager.updateWatermarks(
         lateDataBundle,
         TimerUpdate.empty(),
@@ -785,25 +787,25 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
-                .createBundle(createdInts)
-                .add(WindowedValue.valueInGlobalWindow(1))
+                .createBundle(impulse)
+                .add(WindowedValue.valueInGlobalWindow(new byte[1]))
                 .commit(Instant.now())),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    CommittedBundle<Integer> createdBundle =
+    CommittedBundle<byte[]> impulseBundle =
         bundleFactory
-            .createBundle(createdInts)
-            .add(WindowedValue.valueInGlobalWindow(1))
+            .createBundle(impulse)
+            .add(WindowedValue.valueInGlobalWindow(new byte[1]))
             .commit(Instant.now());
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(keyed),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.emptyList(),
         null);
     manager.refreshAll();
@@ -817,17 +819,16 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarksAfterOnlyEmptyOutput() {
-    CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
+    CommittedBundle<byte[]> emptyCreateOutput = multiWindowedBundle(impulse);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
 
     assertThat(
         updatedSourceWatermarks.getOutputWatermark(),
@@ -849,21 +850,22 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getWatermarksAfterHoldAndEmptyOutput() {
-    CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
+    CommittedBundle<byte[]> firstImpulseOutput =
+        multiWindowedBundle(impulse, new byte[1], new byte[2]);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(firstCreateOutput),
+        Collections.<CommittedBundle<?>>singleton(firstImpulseOutput),
         new Instant(12_000L));
 
-    CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
+    CommittedBundle<byte[]> firstFilterOutput = multiWindowedBundle(filtered);
     manager.updateWatermarks(
-        firstCreateOutput,
+        firstImpulseOutput,
         TimerUpdate.empty(),
         graph.getProducer(filtered),
-        firstCreateOutput.withElements(Collections.emptyList()),
+        firstImpulseOutput.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
         new Instant(10_000L));
     manager.refreshAll();
@@ -871,17 +873,16 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(firstFilterWatermarks.getInputWatermark(), not(lessThan(new Instant(12_000L))));
     assertThat(firstFilterWatermarks.getOutputWatermark(), not(greaterThan(new Instant(10_000L))));
 
-    CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
+    CommittedBundle<byte[]> emptyImpulseOutput = multiWindowedBundle(impulse);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+        Collections.<CommittedBundle<?>>singleton(emptyImpulseOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(graph.getProducer(impulse));
 
     assertThat(
         updatedSourceWatermarks.getOutputWatermark(),
@@ -898,7 +899,7 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
-    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
     assertThat(
         watermarks.getSynchronizedProcessingOutputTime(),
@@ -913,18 +914,18 @@ public class WatermarkManagerTest implements Serializable {
         filteredWatermarks.getSynchronizedProcessingOutputTime(),
         not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
-    CommittedBundle<Integer> createOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
+    CommittedBundle<byte[]> createOutput =
+        bundleFactory.createBundle(impulse).commit(new Instant(1250L));
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(createOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
     assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
 
@@ -945,7 +946,7 @@ public class WatermarkManagerTest implements Serializable {
         not(greaterThan(new Instant(1250L))));
 
     CommittedBundle<?> filterOutputBundle =
-        bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
+        bundleFactory.createBundle(impulseToFlatten).commit(new Instant(1250L));
     manager.updateWatermarks(
         createOutput,
         TimerUpdate.empty(),
@@ -971,24 +972,26 @@ public class WatermarkManagerTest implements Serializable {
    */
   @Test
   public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
+    CommittedBundle<byte[]> impulseBundle =
+        multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[4], new byte[8]);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(createdBundle),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(1248L));
     manager.refreshAll();
 
     TransformWatermarks filteredWms = manager.getWatermarks(graph.getProducer(filtered));
     TransformWatermarks filteredDoubledWms =
-        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+        manager.getWatermarks(graph.getProducer(filteredNotEmpty));
     Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
     Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
 
     StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
-    CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
+    CommittedBundle<byte[]> filteredBundle =
+        multiWindowedBundle(filtered, new byte[2], new byte[8]);
     TimerData pastTimer =
         TimerData.of(
             StateNamespaces.global(),
@@ -1003,10 +1006,10 @@ public class WatermarkManagerTest implements Serializable {
             TimeDomain.PROCESSING_TIME);
     TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         timers,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
@@ -1029,11 +1032,11 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(
         filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(greaterThan(startTime)));
 
-    CommittedBundle<Integer> filteredTimerBundle =
+    CommittedBundle<byte[]> filteredTimerBundle =
         bundleFactory.createKeyedBundle(key, filtered).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    CommittedBundle<Integer> filteredTimerResult =
+    CommittedBundle<byte[]> filteredTimerResult =
         bundleFactory
-            .createKeyedBundle(key, filteredTimesTwo)
+            .createKeyedBundle(key, filteredNotEmpty)
             .commit(filteredWms.getSynchronizedProcessingOutputTime());
     // Complete the processing time timer
     manager.updateWatermarks(
@@ -1055,7 +1058,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         filteredTimerResult,
         TimerUpdate.empty(),
-        graph.getProducer(filteredTimesTwo),
+        graph.getProducer(filteredNotEmpty),
         filteredTimerResult.withElements(Collections.emptyList()),
         Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1075,7 +1078,7 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
     Instant startTime = clock.now();
-    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks watermarks = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
 
     TransformWatermarks filteredWatermarks = manager.getWatermarks(graph.getProducer(filtered));
@@ -1087,29 +1090,29 @@ public class WatermarkManagerTest implements Serializable {
         filteredWatermarks.getSynchronizedProcessingOutputTime(),
         not(greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
-    CommittedBundle<Integer> createOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(1250L));
+    CommittedBundle<byte[]> createOutput =
+        bundleFactory.createBundle(impulse).commit(new Instant(1250L));
 
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(createOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
-    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(createdInts));
+    TransformWatermarks createAfterUpdate = manager.getWatermarks(graph.getProducer(impulse));
     assertThat(
         createAfterUpdate.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now())));
     assertThat(
         createAfterUpdate.getSynchronizedProcessingOutputTime(), not(greaterThan(clock.now())));
 
-    CommittedBundle<Integer> createSecondOutput =
-        bundleFactory.createBundle(createdInts).commit(new Instant(750L));
+    CommittedBundle<byte[]> createSecondOutput =
+        bundleFactory.createBundle(impulse).commit(new Instant(750L));
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.<CommittedBundle<?>>singleton(createSecondOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1120,33 +1123,35 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
-    CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
+    CommittedBundle<byte[]> impulseBundle =
+        multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[3]);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(created),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(40_900L));
     manager.refreshAll();
 
-    CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
+    CommittedBundle<byte[]> filteredBundle =
+        multiWindowedBundle(filtered, new byte[2], new byte[4]);
     Instant upstreamHold = new Instant(2048L);
     TimerData upstreamProcessingTimer =
         TimerData.of(
             StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME);
     manager.updateWatermarks(
-        created,
+        impulseBundle,
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .setTimer(upstreamProcessingTimer)
             .build(),
         graph.getProducer(filtered),
-        created.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
-    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredNotEmpty));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1157,14 +1162,15 @@ public class WatermarkManagerTest implements Serializable {
     // synchronized processing time
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
 
-    CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
+    CommittedBundle<byte[]> otherImpulse =
+        multiWindowedBundle(impulse, new byte[4], new byte[8], new byte[12]);
     manager.updateWatermarks(
-        otherCreated,
+        otherImpulse,
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
             .build(),
         graph.getProducer(filtered),
-        otherCreated.withElements(Collections.emptyList()),
+        otherImpulse.withElements(Collections.emptyList()),
         Collections.emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
@@ -1174,30 +1180,31 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
-    CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
+    CommittedBundle<byte[]> impulseBundle =
+        multiWindowedBundle(impulse, new byte[1], new byte[2], new byte[3]);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.<CommittedBundle<?>>singleton(created),
+        Collections.<CommittedBundle<?>>singleton(impulseBundle),
         new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
-    CommittedBundle<Integer> filteredBundle =
+    CommittedBundle<byte[]> filteredBundle =
         bundleFactory
             .createKeyedBundle(StructuralKey.of("key", StringUtf8Coder.of()), filtered)
             .commit(upstreamHold);
     manager.updateWatermarks(
-        created,
+        impulseBundle,
         TimerUpdate.empty(),
         graph.getProducer(filtered),
-        created.withElements(Collections.emptyList()),
+        impulseBundle.withElements(Collections.emptyList()),
         Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
-    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredTimesTwo));
+    TransformWatermarks downstreamWms = manager.getWatermarks(graph.getProducer(filteredNotEmpty));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1212,13 +1219,13 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+    CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.singleton(createdBundle),
+        Collections.singleton(impulseBundle),
         new Instant(1500L));
     manager.refreshAll();
 
@@ -1246,11 +1253,11 @@ public class WatermarkManagerTest implements Serializable {
             .build();
 
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         update,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        impulseBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1263,7 +1270,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.emptyList(),
         new Instant(50_000L));
@@ -1298,13 +1305,13 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+    CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.singleton(createdBundle),
+        Collections.singleton(impulseBundle),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -1334,11 +1341,11 @@ public class WatermarkManagerTest implements Serializable {
             .build();
 
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         update,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        impulseBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1352,7 +1359,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.emptyList(),
         new Instant(50_000L));
@@ -1386,13 +1393,13 @@ public class WatermarkManagerTest implements Serializable {
     assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+    CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.singleton(createdBundle),
+        Collections.singleton(impulseBundle),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -1422,11 +1429,11 @@ public class WatermarkManagerTest implements Serializable {
             .build();
 
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         update,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        impulseBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1440,7 +1447,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.emptyList(),
         new Instant(50_000L));
@@ -1495,7 +1502,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         initialUpdate,
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.emptyList(),
         new Instant(5000L));
@@ -1505,7 +1512,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         overridingUpdate,
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
         Collections.emptyList(),
         new Instant(10000L));
@@ -1547,23 +1554,23 @@ public class WatermarkManagerTest implements Serializable {
     TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
     TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
 
-    CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
+    CommittedBundle<byte[]> impulseBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         initialUpdate,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        impulseBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
     // This update should override the previous timer.
     manager.updateWatermarks(
-        createdBundle,
+        impulseBundle,
         overridingUpdate,
         graph.getProducer(filtered),
-        createdBundle.withElements(Collections.emptyList()),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        impulseBundle.withElements(Collections.emptyList()),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(impulseToFlatten)),
         new Instant(1000L));
     manager.refreshAll();
 
@@ -1571,9 +1578,9 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        graph.getProducer(createdInts),
+        graph.getProducer(impulse),
         null,
-        Collections.singleton(createdBundle),
+        Collections.singleton(impulseBundle),
         new Instant(3000L));
     manager.refreshAll();
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 9cb2e28..e3c4f96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
 import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -134,9 +136,7 @@ public class Read {
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
-      if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && !ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
+      if (useSdf(input.getPipeline().getOptions())) {
         // We don't use Create here since Create is defined as a BoundedSource and using it would
         // cause an infinite expansion loop. We can reconsider this if Create is implemented
         // directly as a SplittableDoFn.
@@ -209,9 +209,7 @@ public class Read {
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
-      if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && !ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {
+      if (useSdf(input.getPipeline().getOptions())) {
         // We don't use Create here since Create is defined as a BoundedSource and using it would
         // cause an infinite expansion loop. We can reconsider this if Create is implemented
         // directly as a SplittableDoFn.
@@ -924,4 +922,31 @@ public class Read {
   }
 
   private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;
+
+  /**
+   * Used to migrate runners to use splittable DoFn without needing to rely on PTransform
+   * replacement which allows removal of the migration code without changing the pipeline shape
+   * since pipeline shape affects pipeline update for some runners.
+   */
+  private static final Set<String> SPLITTABLE_DOFN_PREFERRED_RUNNERS =
+      ImmutableSet.of("DirectRunner");
+
+  private static boolean useSdf(PipelineOptions options) {
+    // TODO(BEAM-10670): Make this by default true and have runners opt-out instead.
+    boolean runnerPrefersSdf = false;
+    try {
+      runnerPrefersSdf =
+          SPLITTABLE_DOFN_PREFERRED_RUNNERS.contains(options.getRunner().getSimpleName());
+    } catch (Exception e) {
+      // Ignore construction failures since there may not be a runner on the classpath if this is a
+      // test.
+    }
+
+    // We keep the old names of experiments around for portable runners and existing users.
+    return (runnerPrefersSdf
+            || ExperimentalOptions.hasExperiment(options, "beam_fn_api")
+            || ExperimentalOptions.hasExperiment(options, "use_sdf_read"))
+        && !(ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")
+            || ExperimentalOptions.hasExperiment(options, "use_deprecated_read"));
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 86a703f..a00b69a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -25,12 +25,10 @@ import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -48,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
-import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
@@ -66,7 +63,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -74,7 +70,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -83,7 +78,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -1494,26 +1488,5 @@ public class AvroIOTest implements Serializable {
     }
     // TODO: for Write only, test withSuffix,
     // withShardNameTemplate and withoutSharding.
-
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testPrimitiveReadDisplayData() {
-      // Read is no longer a primitive transform when using the portability framework.
-      assumeFalse(
-          ExperimentalOptions.hasExperiment(
-              DisplayDataEvaluator.getDefaultOptions(), "beam_fn_api"));
-      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
-      AvroIO.Read<GenericRecord> read =
-          AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING))
-              .withBeamSchemas(withBeamSchemas)
-              .from("/foo.*");
-
-      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-      assertThat(
-          "AvroIO.Read should include the file pattern in its primitive transform",
-          displayData,
-          hasItem(hasDisplayItem("filePattern")));
-    }
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 4ecfe6a..39266bd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.Compression.GZIP;
 import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.core.Is.is;
@@ -209,7 +208,6 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadInvalidRecord() throws Exception {
-    expectedException.expect(IllegalStateException.class);
     expectedException.expectMessage("Not a valid TFRecord. Fewer than 12 bytes.");
     runTestRead("bar".getBytes(Charsets.UTF_8), new String[0]);
   }
@@ -217,7 +215,6 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadInvalidLengthMask() throws Exception {
-    expectedException.expectCause(instanceOf(IOException.class));
     expectedException.expectCause(hasMessage(containsString("Mismatch of length mask")));
     byte[] data = BaseEncoding.base64().decode(FOO_RECORD_BASE64);
     data[9] += (byte) 1;
@@ -227,7 +224,6 @@ public class TFRecordIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadInvalidDataMask() throws Exception {
-    expectedException.expectCause(instanceOf(IOException.class));
     expectedException.expectCause(hasMessage(containsString("Mismatch of data mask")));
     byte[] data = BaseEncoding.base64().decode(FOO_RECORD_BASE64);
     data[16] += (byte) 1;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 14c306e..2721356 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -27,19 +27,15 @@ import static org.apache.beam.sdk.io.Compression.GZIP;
 import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.Compression.ZIP;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -53,14 +49,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -69,12 +63,10 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -495,24 +487,6 @@ public class TextIOReadTest {
       assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString()));
     }
 
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testPrimitiveReadDisplayData() {
-      // Read is no longer a primitive transform when using the portability framework.
-      assumeFalse(
-          ExperimentalOptions.hasExperiment(
-              DisplayDataEvaluator.getDefaultOptions(), "beam_fn_api"));
-      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
-      TextIO.Read read = TextIO.read().from("foobar");
-
-      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-      assertThat(
-          "TextIO.Read should include the file prefix in its primitive display data",
-          displayData,
-          hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
-    }
-
     /** Options for testing. */
     public interface RuntimeTestOptions extends PipelineOptions {
       ValueProvider<String> getInput();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 3a6ce15..1b415c5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -40,7 +39,6 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -516,44 +513,6 @@ public class BigQueryIOReadTest implements Serializable {
   }
 
   @Test
-  public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read =
-        BigQueryIO.read()
-            .from("project:dataset.tableId")
-            .withTestServices(
-                new FakeBigQueryServices()
-                    .withDatasetService(new FakeDatasetService())
-                    .withJobService(new FakeJobService()))
-            .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        "BigQueryIO.Read should include the table spec in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("table")));
-  }
-
-  @Test
-  public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read =
-        BigQueryIO.read()
-            .fromQuery("foobar")
-            .withTestServices(
-                new FakeBigQueryServices()
-                    .withDatasetService(new FakeDatasetService())
-                    .withJobService(new FakeJobService()))
-            .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        "BigQueryIO.Read should include the query in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("query")));
-  }
-
-  @Test
   public void testBigQueryIOGetName() {
     assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index e39ee99..d5d3846 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -53,7 +52,6 @@ import java.io.ByteArrayOutputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData.Record;
@@ -81,7 +79,6 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -274,14 +271,6 @@ public class BigQueryIOStorageQueryTest {
   }
 
   @Test
-  public void testEvaluatedDisplayData() throws Exception {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    TypedRead<TableRow> typedRead = getDefaultTypedRead();
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
-    assertThat(displayData, hasItem(hasDisplayItem("query")));
-  }
-
-  @Test
   public void testName() {
     assertEquals("BigQueryIO.TypedRead", getDefaultTypedRead().getName());
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 00ffba2..ac5e1af 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -60,7 +59,6 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -87,7 +85,6 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -294,18 +291,6 @@ public class BigQueryIOStorageReadTest {
   }
 
   @Test
-  public void testEvaluatedDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.TypedRead<TableRow> typedRead =
-        BigQueryIO.read(new TableRowParser())
-            .withCoder(TableRowJsonCoder.of())
-            .withMethod(Method.DIRECT_READ)
-            .from("foo.com:project:dataset.table");
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
-    assertThat(displayData, hasItem(hasDisplayItem("table")));
-  }
-
-  @Test
   public void testName() {
     assertEquals(
         "BigQueryIO.TypedRead",
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 02b21c5..8a7d4bf 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -67,7 +67,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -101,7 +100,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Wait;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -1121,34 +1119,6 @@ public class BigtableIOTest {
   }
 
   @Test
-  public void testReadingPrimitiveDisplayData() throws IOException, InterruptedException {
-    final String table = "fooTable";
-    service.createTable(table);
-
-    RowFilter rowFilter =
-        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")).build();
-
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigtableIO.Read read =
-        BigtableIO.read()
-            .withBigtableOptions(BIGTABLE_OPTIONS)
-            .withTableId(table)
-            .withRowFilter(rowFilter)
-            .withBigtableService(service);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        "BigtableIO.Read should include the table id in its primitive display data",
-        displayData,
-        Matchers.hasItem(hasDisplayItem("tableId")));
-    assertThat(
-        "BigtableIO.Read should include the row filter, if it exists, in its primitive "
-            + "display data",
-        displayData,
-        Matchers.hasItem(hasDisplayItem("rowFilter")));
-  }
-
-  @Test
   public void testReadingDisplayDataFromRuntimeParameters() {
     ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class);
     BigtableIO.Read read =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index 90dfbf8..0b3b40b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.expansion.service.ExpansionService;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -131,7 +132,12 @@ public class PubsubIOExternalTest {
                 .withFieldValue("id_label", idAttribute)
                 .build());
 
-    Pipeline p = Pipeline.create();
+    // Requirements are not passed as part of the expansion service so the validation
+    // fails because of how we construct the pipeline to expand the transform since it now
+    // has a transform with a requirement.
+    Pipeline p =
+        Pipeline.create(
+            PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create());
     p.apply("unbounded", Create.of(1, 2, 3)).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);