You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/12/21 18:35:37 UTC

Waiting on streaming job with StatementSet.execute() when using Web Submission

Hi,

Flink 1.14.2
Scala 2.12

I have a streaming job that executes and I want to infinitely wait for it's
completion, or if an exception is thrown during initialization. When using
*statementSet.execute().await()*, I get an error:

Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
cannot be fetched through the Job Client when in Web Submission.*
at
org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
at
org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more

This is because the Web Submission via the REST API is using
the WebSubmissionJobClient.

How can I wait on my Flink SQL streaming job when submitting through the
REST API?
-- 
Best Regards,
Yuval Itzchakov

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

You talked about status code so I guess you're speaking about the client
that submits the job, not the job itself. Flink jobs does not have "exit
codes", they only have status such as RUNNING and FINISHED.

When you run your user code locally, it is running in a testing
mini-cluster in JVM. So if your client code exits the JVM will also exit
and the job will not be finished properly. However if you submit your job
to the cluster it will be running in the remote cluster. In this case it
doesn't matter whether client has exited or not.

You can check Flink web UI to see if your job has been submitted and
running in the cluster.

Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:48写道:

> I mean it finishes successful and exists with status code 0. Both when
> running locally and submitting to the cluster.
>
> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> By "the streaming job stops" do you mean the job ends with CANCELED state
>> instead of FINISHED state? Which kind of job are you running? Is it a
>> select job or an insert job? Insert jobs should run continuously once
>> they're submitted. Could you share your user code if possible?
>>
>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:
>>
>>> Hi Caizhi,
>>>
>>> If I don't block on statementset.execute, the job finishes immediately
>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>> somehow need to block.
>>>
>>>
>>>
>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> You can poll the status of that job with REST API [1]. You can tell
>>>> that the job successfully finishes by the FINISHED state and that the job
>>>> fails by the FAILED state.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>
>>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink 1.14.2
>>>>> Scala 2.12
>>>>>
>>>>> I have a streaming job that executes and I want to infinitely wait for
>>>>> it's completion, or if an exception is thrown during initialization. When
>>>>> using *statementSet.execute().await()*, I get an error:
>>>>>
>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>> at
>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>> at
>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>> ... 7 more
>>>>>
>>>>> This is because the Web Submission via the REST API is using
>>>>> the WebSubmissionJobClient.
>>>>>
>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>> the REST API?
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov
>>>>>
>>>>

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Caizhi,
This is our program printing out the status code, but it doesn't really
matter. The point is that I have no ability to run a StatementSet through
the WebSubmission Rest API without blocking ATM.

On Wed, Dec 22, 2021 at 1:39 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> This "Finished successfully with value: 0" seems quite suspicious. If you
> search through the code base no log is printing such information. Could you
> please check which component is printing this log and determine which
> process this exit code belongs to?
>
> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 15:48写道:
>
>> The job construction itself is a bit complex, but it can either be a
>> StatementSet that's being filled, or there is some kind of conversion Table
>> -> DataStream and then we put the transformations on the DataStream itself.
>> Invocation looks like this:
>>
>>       executionEffect =
>>         if (...)
>>           FlinkTask.lockedEffect(flink.execute(jobName))
>>         else FlinkTask.lockedEffect(statementSet.execute())
>>
>> If I don't infinitely block on this, it terminates right after starting
>> the execution:
>>
>> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
>> contain a setter for field partitionKey
>> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
>> cannot be used as a POJO type because not all fields are valid POJO fields,
>> and must be processed as GenericType. Please read the Flink documentation
>> on "Data Types & Serialization" for details of the effect on performance.
>> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
>> contain a setter for field stage
>> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
>> cannot be used as a POJO type because not all fields are valid POJO fields,
>> and must be processed as GenericType. Please read the Flink documentation
>> on "Data Types & Serialization" for details of the effect on performance.
>> 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
>> configuration key 'akka.client.timeout' instead of proper key
>> 'client.timeout'
>> 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
>> 492c9f07d8b3458a52595ab49f636205 is submitted.
>> 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting
>> Job with JobId=492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
>> submission '....' (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....'
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
>> endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_2 .
>> 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
>> restart back off time strategy
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
>> backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
>> Running initialization on master for job ...
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
>> Successfully ran initialization on master in 0 ms.
>> 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
>> pipelined regions in 0 ms
>> 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state
>> backend has been configured, using default (HashMap)
>> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
>> 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
>> loader loads the state backend as HashMapStateBackend
>> 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
>> storage is set to 'jobmanager'
>> 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No
>> checkpoint found during restore.
>> 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
>> 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored
>> state)
>> 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
>> checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
>> 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring
>> job 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
>> 492c9f07d8b3458a52595ab49f636205 located at file:..
>> 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
>> restore
>> 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
>> strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
>> for ... (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of
>> job '...' (492c9f07d8b3458a52595ab49f636205) under job master id
>> 00000000000000000000000000000000.
>> 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting
>> scheduling with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>> 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
>> (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
>> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
>> (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
>> from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
>> from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
>> (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from
>> CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to
>> ResourceManager akka.tcp://flink@localhost
>> :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
>> 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved
>> ResourceManager address, beginning registration
>> 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job
>> manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
>> for job 492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job
>> manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
>> for job 492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully
>> registered at ResourceManager, leader id: 00000000000000000000000000000000.
>> 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received
>> resource requirements from job 492c9f07d8b3458a52595ab49f636205:
>> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
>> numberOfRequiredSlots=3}]
>> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1)
>> (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ...
>> (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to
>> ...:64216-650fc2 @ ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id
>> 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id
>> 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id
>> fe0a5941283557538901c8a9774a2584
>> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
>> Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id
>> 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id
>> 026dabf16a12ddf35399938466a27572
>> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch ->
>> Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id
>> bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (2/3)  (8d046c60a84900cba31877ec28f81124)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
>> -> Sink: SnowflakeSinkProvider(...) (2/3)  (attempt #0) with attempt id
>> 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id
>> fe0a5941283557538901c8a9774a2584
>> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (3/3)  (39a1afd89f627816f018fa9652865887)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
>> -> Sink: SnowflakeSinkProvider(...) (3/3)  (attempt #0) with attempt id
>> 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id
>> 026dabf16a12ddf35399938466a27572
>> 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0
>> 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting
>> StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
>> Diagnostics Cluster entrypoint has been closed externally..
>> 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down
>> rest endpoint.
>> 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at
>> 0.0.0.0:64213
>>
>> Process finished with exit code 239
>>
>> On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> I mean it finishes successful and exists with status code 0. Both when
>>> running locally and submitting to the cluster.
>>>
>>> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <ts...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> By "the streaming job stops" do you mean the job ends with CANCELED
>>>> state instead of FINISHED state? Which kind of job are you running? Is it a
>>>> select job or an insert job? Insert jobs should run continuously once
>>>> they're submitted. Could you share your user code if possible?
>>>>
>>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:
>>>>
>>>>> Hi Caizhi,
>>>>>
>>>>> If I don't block on statementset.execute, the job finishes immediately
>>>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>>>> somehow need to block.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> You can poll the status of that job with REST API [1]. You can tell
>>>>>> that the job successfully finishes by the FINISHED state and that the job
>>>>>> fails by the FAILED state.
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>>>
>>>>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Flink 1.14.2
>>>>>>> Scala 2.12
>>>>>>>
>>>>>>> I have a streaming job that executes and I want to infinitely wait
>>>>>>> for it's completion, or if an exception is thrown during initialization.
>>>>>>> When using *statementSet.execute().await()*, I get an error:
>>>>>>>
>>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>>>> at
>>>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>>>> at
>>>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>>>> ... 7 more
>>>>>>>
>>>>>>> This is because the Web Submission via the REST API is using
>>>>>>> the WebSubmissionJobClient.
>>>>>>>
>>>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>>>> the REST API?
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Yuval Itzchakov
>>>>>>>
>>>>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

This "Finished successfully with value: 0" seems quite suspicious. If you
search through the code base no log is printing such information. Could you
please check which component is printing this log and determine which
process this exit code belongs to?

Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 15:48写道:

> The job construction itself is a bit complex, but it can either be a
> StatementSet that's being filled, or there is some kind of conversion Table
> -> DataStream and then we put the transformations on the DataStream itself.
> Invocation looks like this:
>
>       executionEffect =
>         if (...)
>           FlinkTask.lockedEffect(flink.execute(jobName))
>         else FlinkTask.lockedEffect(statementSet.execute())
>
> If I don't infinitely block on this, it terminates right after starting
> the execution:
>
> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
> contain a setter for field partitionKey
> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation
> on "Data Types & Serialization" for details of the effect on performance.
> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
> contain a setter for field stage
> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
> cannot be used as a POJO type because not all fields are valid POJO fields,
> and must be processed as GenericType. Please read the Flink documentation
> on "Data Types & Serialization" for details of the effect on performance.
> 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
> configuration key 'akka.client.timeout' instead of proper key
> 'client.timeout'
> 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
> 492c9f07d8b3458a52595ab49f636205 is submitted.
> 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job
> with JobId=492c9f07d8b3458a52595ab49f636205.
> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
> submission '....' (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....'
> (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
> endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_2 .
> 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
> (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
> restart back off time strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
> backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
> Running initialization on master for job ...
> (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
> Successfully ran initialization on master in 0 ms.
> 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
> pipelined regions in 0 ms
> 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend
> has been configured, using default (HashMap)
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
> 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
> loader loads the state backend as HashMapStateBackend
> 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
> storage is set to 'jobmanager'
> 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint
> found during restore.
> 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
> 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored
> state)
> 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
> checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
> 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job
> 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
> 492c9f07d8b3458a52595ab49f636205 located at file:..
> 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
> restore
> 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
> strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
> for ... (492c9f07d8b3458a52595ab49f636205).
> 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job
> '...' (492c9f07d8b3458a52595ab49f636205) under job master id
> 00000000000000000000000000000000.
> 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting
> scheduling with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
> (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
> (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
> from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
> from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
> (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from
> CREATED to SCHEDULED.
> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
> switched from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124)
> switched from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887)
> switched from CREATED to SCHEDULED.
> 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to
> ResourceManager akka.tcp://flink@localhost
> :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
> 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved
> ResourceManager address, beginning registration
> 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job
> manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
> for job 492c9f07d8b3458a52595ab49f636205.
> 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job
> manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
> for job 492c9f07d8b3458a52595ab49f636205.
> 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully
> registered at ResourceManager, leader id: 00000000000000000000000000000000.
> 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received
> resource requirements from job 492c9f07d8b3458a52595ab49f636205:
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
> numberOfRequiredSlots=3}]
> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1)
> (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ...
> (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to
> ...:64216-650fc2 @ ... (dataPort=64218) with allocation id
> 2e63675e30c595a8538f7a006fe0678d
> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map ->
> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
> from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map ->
> Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id
> 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id
> 2e63675e30c595a8538f7a006fe0678d
> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map ->
> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
> from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
> Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id
> 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id
> fe0a5941283557538901c8a9774a2584
> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map ->
> Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched
> from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
> Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id
> 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id
> 026dabf16a12ddf35399938466a27572
> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
> switched from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch ->
> Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id
> bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id
> 2e63675e30c595a8538f7a006fe0678d
> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (2/3)  (8d046c60a84900cba31877ec28f81124)
> switched from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
> -> Sink: SnowflakeSinkProvider(...) (2/3)  (attempt #0) with attempt id
> 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id
> fe0a5941283557538901c8a9774a2584
> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
> SnowflakeSinkProvider(...) (3/3)  (39a1afd89f627816f018fa9652865887)
> switched from SCHEDULED to DEPLOYING.
> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
> -> Sink: SnowflakeSinkProvider(...) (3/3)  (attempt #0) with attempt id
> 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id
> 026dabf16a12ddf35399938466a27572
> 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0
> 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting
> StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
> Diagnostics Cluster entrypoint has been closed externally..
> 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down
> rest endpoint.
> 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at
> 0.0.0.0:64213
>
> Process finished with exit code 239
>
> On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> I mean it finishes successful and exists with status code 0. Both when
>> running locally and submitting to the cluster.
>>
>> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> By "the streaming job stops" do you mean the job ends with CANCELED
>>> state instead of FINISHED state? Which kind of job are you running? Is it a
>>> select job or an insert job? Insert jobs should run continuously once
>>> they're submitted. Could you share your user code if possible?
>>>
>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:
>>>
>>>> Hi Caizhi,
>>>>
>>>> If I don't block on statementset.execute, the job finishes immediately
>>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>>> somehow need to block.
>>>>
>>>>
>>>>
>>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> You can poll the status of that job with REST API [1]. You can tell
>>>>> that the job successfully finishes by the FINISHED state and that the job
>>>>> fails by the FAILED state.
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>>
>>>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink 1.14.2
>>>>>> Scala 2.12
>>>>>>
>>>>>> I have a streaming job that executes and I want to infinitely wait
>>>>>> for it's completion, or if an exception is thrown during initialization.
>>>>>> When using *statementSet.execute().await()*, I get an error:
>>>>>>
>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>>> at
>>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>>> at
>>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>>> ... 7 more
>>>>>>
>>>>>> This is because the Web Submission via the REST API is using
>>>>>> the WebSubmissionJobClient.
>>>>>>
>>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>>> the REST API?
>>>>>> --
>>>>>> Best Regards,
>>>>>> Yuval Itzchakov
>>>>>>
>>>>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Yuval Itzchakov <yu...@gmail.com>.
The job construction itself is a bit complex, but it can either be a
StatementSet that's being filled, or there is some kind of conversion Table
-> DataStream and then we put the transformations on the DataStream itself.
Invocation looks like this:

      executionEffect =
        if (...)
          FlinkTask.lockedEffect(flink.execute(jobName))
        else FlinkTask.lockedEffect(statementSet.execute())

If I don't infinitely block on this, it terminates right after starting the
execution:

2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field partitionKey
2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field stage
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
configuration key 'akka.client.timeout' instead of proper key
'client.timeout'
2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
492c9f07d8b3458a52595ab49f636205 is submitted.
2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job
with JobId=492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
submission '....' (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....'
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder Running
initialization on master for job ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
Successfully ran initialization on master in 0 ms.
2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
pipelined regions in 0 ms
2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend
has been configured, using default (HashMap)
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
loader loads the state backend as HashMapStateBackend
2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
storage is set to 'jobmanager'
2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint
found during restore.
2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored
state)
2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job
492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
492c9f07d8b3458a52595ab49f636205 located at file:..
2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
restore
2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job
'...' (492c9f07d8b3458a52595ab49f636205) under job master id
00000000000000000000000000000000.
2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting scheduling
with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
(492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
(3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
(Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from
CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to
ResourceManager akka.tcp://flink@localhost
:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved
ResourceManager address, beginning registration
2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job
manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
for job 492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job
manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
for job 492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully
registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received
resource requirements from job 492c9f07d8b3458a52595ab49f636205:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=3}]
2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1)
(3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ...
(1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to
...:64216-650fc2 @ ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map ->
Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id
3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id
644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id
fe0a5941283557538901c8a9774a2584
2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id
00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id
026dabf16a12ddf35399938466a27572
2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch ->
Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id
bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (2/3)  (8d046c60a84900cba31877ec28f81124)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
-> Sink: SnowflakeSinkProvider(...) (2/3)  (attempt #0) with attempt id
8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id
fe0a5941283557538901c8a9774a2584
2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (3/3)  (39a1afd89f627816f018fa9652865887)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
-> Sink: SnowflakeSinkProvider(...) (3/3)  (attempt #0) with attempt id
39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id
026dabf16a12ddf35399938466a27572
2021-12-22 09:25:28,917 INFO Finished successfully with value: 0
2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting
StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down
rest endpoint.
2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at
0.0.0.0:64213

Process finished with exit code 239

On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yu...@gmail.com> wrote:

> I mean it finishes successful and exists with status code 0. Both when
> running locally and submitting to the cluster.
>
> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> By "the streaming job stops" do you mean the job ends with CANCELED state
>> instead of FINISHED state? Which kind of job are you running? Is it a
>> select job or an insert job? Insert jobs should run continuously once
>> they're submitted. Could you share your user code if possible?
>>
>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:
>>
>>> Hi Caizhi,
>>>
>>> If I don't block on statementset.execute, the job finishes immediately
>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>> somehow need to block.
>>>
>>>
>>>
>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> You can poll the status of that job with REST API [1]. You can tell
>>>> that the job successfully finishes by the FINISHED state and that the job
>>>> fails by the FAILED state.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>
>>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink 1.14.2
>>>>> Scala 2.12
>>>>>
>>>>> I have a streaming job that executes and I want to infinitely wait for
>>>>> it's completion, or if an exception is thrown during initialization. When
>>>>> using *statementSet.execute().await()*, I get an error:
>>>>>
>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>> at
>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>> at
>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>> ... 7 more
>>>>>
>>>>> This is because the Web Submission via the REST API is using
>>>>> the WebSubmissionJobClient.
>>>>>
>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>> the REST API?
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov
>>>>>
>>>>

-- 
Best Regards,
Yuval Itzchakov.

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Yuval Itzchakov <yu...@gmail.com>.
I mean it finishes successful and exists with status code 0. Both when
running locally and submitting to the cluster.

On Wed, Dec 22, 2021, 08:36 Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> By "the streaming job stops" do you mean the job ends with CANCELED state
> instead of FINISHED state? Which kind of job are you running? Is it a
> select job or an insert job? Insert jobs should run continuously once
> they're submitted. Could you share your user code if possible?
>
> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:
>
>> Hi Caizhi,
>>
>> If I don't block on statementset.execute, the job finishes immediately
>> with exit code 0 and the streaming job stops, and that's not what I want. I
>> somehow need to block.
>>
>>
>>
>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> You can poll the status of that job with REST API [1]. You can tell that
>>> the job successfully finishes by the FINISHED state and that the job fails
>>> by the FAILED state.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>
>>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>
>>>> Hi,
>>>>
>>>> Flink 1.14.2
>>>> Scala 2.12
>>>>
>>>> I have a streaming job that executes and I want to infinitely wait for
>>>> it's completion, or if an exception is thrown during initialization. When
>>>> using *statementSet.execute().await()*, I get an error:
>>>>
>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>> at
>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>> at
>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>> ... 7 more
>>>>
>>>> This is because the Web Submission via the REST API is using
>>>> the WebSubmissionJobClient.
>>>>
>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>> the REST API?
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov
>>>>
>>>

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

By "the streaming job stops" do you mean the job ends with CANCELED state
instead of FINISHED state? Which kind of job are you running? Is it a
select job or an insert job? Insert jobs should run continuously once
they're submitted. Could you share your user code if possible?

Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 14:11写道:

> Hi Caizhi,
>
> If I don't block on statementset.execute, the job finishes immediately
> with exit code 0 and the streaming job stops, and that's not what I want. I
> somehow need to block.
>
>
>
> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> You can poll the status of that job with REST API [1]. You can tell that
>> the job successfully finishes by the FINISHED state and that the job fails
>> by the FAILED state.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>
>> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>>
>>> Hi,
>>>
>>> Flink 1.14.2
>>> Scala 2.12
>>>
>>> I have a streaming job that executes and I want to infinitely wait for
>>> it's completion, or if an exception is thrown during initialization. When
>>> using *statementSet.execute().await()*, I get an error:
>>>
>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
>>> cannot be fetched through the Job Client when in Web Submission.*
>>> at
>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>> at
>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>> ... 7 more
>>>
>>> This is because the Web Submission via the REST API is using
>>> the WebSubmissionJobClient.
>>>
>>> How can I wait on my Flink SQL streaming job when submitting through the
>>> REST API?
>>> --
>>> Best Regards,
>>> Yuval Itzchakov
>>>
>>

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Caizhi,

If I don't block on statementset.execute, the job finishes immediately with
exit code 0 and the streaming job stops, and that's not what I want. I
somehow need to block.



On Wed, Dec 22, 2021, 03:43 Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> You can poll the status of that job with REST API [1]. You can tell that
> the job successfully finishes by the FINISHED state and that the job fails
> by the FAILED state.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>
> Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:
>
>> Hi,
>>
>> Flink 1.14.2
>> Scala 2.12
>>
>> I have a streaming job that executes and I want to infinitely wait for
>> it's completion, or if an exception is thrown during initialization. When
>> using *statementSet.execute().await()*, I get an error:
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
>> cannot be fetched through the Job Client when in Web Submission.*
>> at
>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>> at
>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>> ... 7 more
>>
>> This is because the Web Submission via the REST API is using
>> the WebSubmissionJobClient.
>>
>> How can I wait on my Flink SQL streaming job when submitting through the
>> REST API?
>> --
>> Best Regards,
>> Yuval Itzchakov
>>
>

Re: Waiting on streaming job with StatementSet.execute() when using Web Submission

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

You can poll the status of that job with REST API [1]. You can tell that
the job successfully finishes by the FINISHED state and that the job fails
by the FAILED state.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid

Yuval Itzchakov <yu...@gmail.com> 于2021年12月22日周三 02:36写道:

> Hi,
>
> Flink 1.14.2
> Scala 2.12
>
> I have a streaming job that executes and I want to infinitely wait for
> it's completion, or if an exception is thrown during initialization. When
> using *statementSet.execute().await()*, I get an error:
>
> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job Result
> cannot be fetched through the Job Client when in Web Submission.*
> at
> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
> at
> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
> ... 7 more
>
> This is because the Web Submission via the REST API is using
> the WebSubmissionJobClient.
>
> How can I wait on my Flink SQL streaming job when submitting through the
> REST API?
> --
> Best Regards,
> Yuval Itzchakov
>