You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 20:51:04 UTC

[GitHub] [beam] damccorm opened a new issue, #20992: pipeline.run() blocks when using FlinkRunner

damccorm opened a new issue, #20992:
URL: https://github.com/apache/beam/issues/20992

   `pipeline.run()` is documented to be asynchronous (cf. [create-your-pipeline](https://beam.apache.org/documentation/pipelines/create-your-pipeline/)). It seems that when using FlinkRunner (embedded or remote) the call blocks until the pipeline finishes.
   
   Digging into Flink code I found that both, `LocalStreamEnvironment` and `RemoteStreamEnvironment` set `execution.attached` to true. This causes that `StreamExecutionEnvironment.execute` blocks later on:
   
   ```
   
       public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
           final JobClient
   jobClient = executeAsync(streamGraph);
   
           try {
               final JobExecutionResult jobExecutionResult;
   
   
              if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                   jobExecutionResult
   = jobClient.getJobExecutionResult().get(); // <==== execution is blocked here
               } else {
   
                  jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
           
      }
   
               jobListeners.forEach(
                       jobListener -> jobListener.onJobExecuted(jobExecutionResult,
   null));
   
               return jobExecutionResult;
           } catch (Throwable t) {
               // get()
   on the JobExecutionResult Future will throw an ExecutionException. This
               // behaviour was
   largely not there in Flink versions before the PipelineExecutor
               // refactoring so we should
   strip that exception.
               Throwable strippedException = ExceptionUtils.stripExecutionException(t);
   
   
              jobListeners.forEach(
                       jobListener -> {
                           jobListener.onJobExecuted(null,
   strippedException);
                       });
               ExceptionUtils.rethrowException(strippedException);
   
   
              // never reached, only make javac happy
               return null;
           }
       }
   
   ```
   
   
   Imported from Jira [BEAM-12477](https://issues.apache.org/jira/browse/BEAM-12477). Original Jira may contain additional context.
   Reported by: stefan.wachter@gmx.de.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xsolo commented on issue #20992: pipeline.run() blocks when using FlinkRunner

Posted by GitBox <gi...@apache.org>.
xsolo commented on issue #20992:
URL: https://github.com/apache/beam/issues/20992#issuecomment-1374895232

   we have the same issues, would like to know if this is on a road map? we use flink 1.15.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-ramA7 commented on issue #20992: pipeline.run() blocks when using FlinkRunner

Posted by GitBox <gi...@apache.org>.
github-ramA7 commented on issue #20992:
URL: https://github.com/apache/beam/issues/20992#issuecomment-1337009789

   Is there further updates on this. I am looking to use periodic metric pusher. But seems as mentioned in description the client execution is getting blocked. And I see metric pusher class being registered after attaining results object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org