You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/09 21:40:33 UTC

[GitHub] [beam] boyuanzz commented on pull request #13014: Delete unneeded PCollections in pipeline_from_stages()

boyuanzz commented on pull request #13014:
URL: https://github.com/apache/beam/pull/13014#issuecomment-706412811


   It seems like this change causes `flink_runner_test.py::FlinkRunnerTestOptimized` fails on error message like(it's taken from test_sdf case):
   ```
   [grpc-default-executor-0] WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered Unexpected Exception during validation
   java.lang.RuntimeException: Failed to validate transform ((((ref_PCollection_PCollection_1/Read)+(ref_AppliedPTransform_Create/FlatMap(<lambda at core.py:2945>)_4))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/AddRandomKeys_7))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)_9))+(ref_PCollection_PCollection_4/Write)
   	at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:221)
   	at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:120)
   	at org.apache.beam.runners.core.construction.graph.PipelineValidator.validate(PipelineValidator.java:100)
   	at org.apache.beam.runners.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:234)
   	at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:961)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.lang.IllegalArgumentException: ExecutableStage ((((ref_PCollection_PCollection_1/Read)+(ref_AppliedPTransform_Create/FlatMap(<lambda at core.py:2945>)_4))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/AddRandomKeys_7))+(ref_AppliedPTransform_Create/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)_9))+(ref_PCollection_PCollection_4/Write) uses unknown output ref_PCollection_PCollection_4
   	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
   	at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateExecutableStage(PipelineValidator.java:306)
   	at org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:219)
   	... 16 more
   
   ```
   
   How to reproduce:
   ```
   ./gradlew runners:flink:1.10:job-server:shadowJar
   pytest sdks/python/apache_beam/runners/portability/flink_runner_test.py::FlinkRunnerTestOptimized::test_sdf --test-pipeline-options " --environment_type=LOOPBACK"
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org