You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 20:51:04 UTC
[GitHub] [beam] damccorm opened a new issue, #20992: pipeline.run() blocks when using FlinkRunner
damccorm opened a new issue, #20992:
URL: https://github.com/apache/beam/issues/20992
`pipeline.run()` is documented to be asynchronous (cf. [create-your-pipeline](https://beam.apache.org/documentation/pipelines/create-your-pipeline/)). It seems that when using FlinkRunner (embedded or remote) the call blocks until the pipeline finishes.
Digging into Flink code I found that both, `LocalStreamEnvironment` and `RemoteStreamEnvironment` set `execution.attached` to true. This causes that `StreamExecutionEnvironment.execute` blocks later on:
```
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient
jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult
= jobClient.getJobExecutionResult().get(); // <==== execution is blocked here
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult,
null));
return jobExecutionResult;
} catch (Throwable t) {
// get()
on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was
largely not there in Flink versions before the PipelineExecutor
// refactoring so we should
strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null,
strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
```
Imported from Jira [BEAM-12477](https://issues.apache.org/jira/browse/BEAM-12477). Original Jira may contain additional context.
Reported by: stefan.wachter@gmx.de.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] xsolo commented on issue #20992: pipeline.run() blocks when using FlinkRunner
Posted by GitBox <gi...@apache.org>.
xsolo commented on issue #20992:
URL: https://github.com/apache/beam/issues/20992#issuecomment-1374895232
we have the same issues, would like to know if this is on a road map? we use flink 1.15.3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-ramA7 commented on issue #20992: pipeline.run() blocks when using FlinkRunner
Posted by GitBox <gi...@apache.org>.
github-ramA7 commented on issue #20992:
URL: https://github.com/apache/beam/issues/20992#issuecomment-1337009789
Is there further updates on this. I am looking to use periodic metric pusher. But seems as mentioned in description the client execution is getting blocked. And I see metric pusher class being registered after attaining results object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org