You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Guozhen Yang (Jira)" <ji...@apache.org> on 2023/06/25 03:06:00 UTC

[jira] [Created] (FLINK-32423) Flink-sql-runner-example application fails if multiple execute() called in one sql file

Guozhen Yang created FLINK-32423:
------------------------------------

             Summary: Flink-sql-runner-example application fails if multiple execute() called in one sql file
                 Key: FLINK-32423
                 URL: https://issues.apache.org/jira/browse/FLINK-32423
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
            Reporter: Guozhen Yang


h2. Summary:

flink-sql-runner-example application fails if multiple execute() called in one sql file
h2. Background:

We have a series of batch jobs running on a table partitioned by date. The jobs need to be run sequencially in chronological order. Which means only after the batch job #1 finishes running 2023-06-01 partition, the batch job #2 running 2023-06-02 partition starts running. So we loop through dates and submit multiple jobs in a single application, and the flink application is deployed in application mode with HA turned off.

According to [flink document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode], the Application Mode allows the submission of applications consisting of multiple jobs, but High-Availability is not supported in these cases.
h2. The problem:

The application consisted of multiple jobs fails when the second job is executed.

Stack trace is shown as below:
{noformat}
2023-06-21 03:21:44,720 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.1
    6.2.jar:1.16.2]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist
    -1.16.2.jar:1.16.2]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-a
    kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed
    cb5a1.jar:1.16.2]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2
    f4d1edcb5a1.jar:1.16.2]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar
    :1.16.2]
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
    at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
{noformat}
h2. How to reproduce:

1. Start a minikube cluster
2. Add new script file _two-selects.sql_ to [examples/flink-sql-runner-example/sql-scripts folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example]. 
The contents of _two-selects.sql_ is shown as below.
{noformat}
select 1;
select 1;
{noformat}
3. Follow the [instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md] to build the flink-sql-runner-example image.
4. Use minikube image load command to load the image.
4. Modify [flinkdep yaml file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml], change sepc.job.args to args: ["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep yaml file.
5. The application fails.
h2. Possible reason:

According to [flink-kubernetes-oeprator document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler], flink by default generate deterministic jobids based on clusterId.
{quote}Flink by default generates deterministic jobids based on the clusterId (which is the CR name in our case). This causes checkpoint path conflicts if the job is ever restarted from an empty state (stateless upgrade). We therefore generate a random jobid to avoid this.
{quote}
I found flink-kubernetes-operator always set job id when submitting application. [Corresponding code of setJobIdIfNecessary is here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37]

But according to [flink's code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213], there are two situations.

1. HA is not activated and job id is not set when submitting application(line 213 to 217). runApplicationAsync is called with enforceSingleJobExecution=false. So mult-job execution is viable.
2. If job id is not set when submitting application(line 218 to 233). Job id is set based on cluster id. After the job is fixed, runApplicationAsync is called with enforceSingleJobExecution=true. So multi-job execution is not viable.

If flink-kubernetes-operator always set job id when submitting application, condition of situation #1 will never match. So application submitted with flink-kubernetes-operator cannot execute multiple jobs, even if the application is deployed in application mode and with HA turned off.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)