You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/11 22:12:19 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #12519: [BEAM-10670] Make Read execute as a splittable DoFn by default for the Java DirectRunner.

boyuanzz commented on a change in pull request #12519:
URL: https://github.com/apache/beam/pull/12519#discussion_r468874809



##########
File path: runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
##########
@@ -211,10 +211,6 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
     KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual =
         processContext.getTakenCheckpoint();
     if (cont.shouldResume()) {
-      checkState(

Review comment:
       Is it because we have `checkDone` now?

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
##########
@@ -630,4 +635,38 @@ public void tearDown() {
       invoker = null;
     }
   }
+
+  /**
+   * Throws an {@link IllegalArgumentException} if the pipeline contains any primitive read
+   * transforms that have not been expanded to be executed as {@link DoFn splittable DoFns}.
+   */
+  public static void validateNoPrimitiveReads(Pipeline pipeline) {
+    pipeline.traverseTopologically(new ValidateNoPrimitiveReads());
+  }
+
+  /**
+   * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that the pipeline does not
+   * contain any primitive reads.

Review comment:
       ```suggestion
      * contain any primitive reads when use_deprecated_read is not specified.
   ```

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
##########
@@ -179,6 +180,11 @@ public DirectPipelineResult run(Pipeline pipeline) {
 
       DisplayDataValidator.validatePipeline(pipeline);
       DisplayDataValidator.validateOptions(options);
+      // TODO(BEAM-10670): Remove the deprecated Read and make the splittable DoFn the only option.
+      if (!(ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")

Review comment:
       Is it possible to make `beam_fn_api_use_deprecated_read` and `use_deprecated_read` into one `use_deprecated_read` since they seem to the same.

##########
File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
##########
@@ -167,4 +180,55 @@ public void testBoundednessForUnboundedFn() {
                 "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
             .isBounded());
   }
+
+  private static class FakeBoundedSource extends BoundedSource<String> {
+    @Override
+    public List<? extends BoundedSource<String>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return Collections.singletonList(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Coder<String> getOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+
+  @Test
+  public void testValidateThatThereAreNoPrimitiveReads() {

Review comment:
       Can we add one block to test using `use_deprecated_read ` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org