You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2020/01/30 15:15:10 UTC

Re: REST Monitoring Savepoint failed

Hi Ramya,

I think this message is better suited for the user ML list. Which version
of Flink are you using? Have you checked the Flink logs to see whether they
contain anything suspicious?

Cheers,
Till

On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com> wrote:

> Hi,
>
> I am trying to dynamically increase the parallelism of the job. In the
> process of it, while I am trying to trigger the savepoint, i get
> the following error. Any help would be appreciated.
>
> The URL triggered is :
>
> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
> {
>     "target-directory" :
> "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
>     "cancel-job" : "false"
> }
>
> Error as below:
>
>
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
> trigger savepoint. Decline reason: An Exception occurred while triggering
> the checkpoint.\n\tat
>
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat akka.dis
>

Re: REST Monitoring Savepoint failed

Posted by Till Rohrmann <tr...@apache.org>.
At the moment this is the case Ramya. We plan to add the auto scaling
feature back again in one of the future Flink versions, though.

Cheers,
Till

On Mon, Feb 3, 2020 at 5:27 AM Ramya Ramamurthy <ha...@gmail.com> wrote:

> Thanks Till Rohrmann for the update.
>
> So even if we upgrade to the newer version of Flink, we have to manually
> rescale. Is that correct? that is to stop the job and start again with the
> desired parallelism.
>
> Thanks.
>
> On Fri, Jan 31, 2020 at 6:42 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Thanks for providing us with the logs Ramya. I think the problem is that
>> with FLINK-10354 [1], we accidentally broke the rescaling feature in Flink
>> >= 1.7.0. The problem is that before savepoints weren't used for recovery
>> and, hence, they were not part of the CompletedCheckpointStore. With
>> FLINK-10354, this changed and now the savepoints are part of the completed
>> checkpoint store. This breaks an assumption of the rescaling feature.
>>
>> I would recommend to manually rescale by (1) taking a savepoint, (2)
>> stopping the job, (3) resubmitting the job with the changed parallelism
>> resuming from the taken savepoint.
>>
>> A side note, the rescaling feature has been removed in Flink >= 1.9.0
>> because of some inherent limitations.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10354
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 31, 2020 at 11:49 AM Ramya Ramamurthy <ha...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Please find the below steps.
>>>
>>> 1) Trigger a savepoint to GCS
>>> 2) Trigger /rescaling REST API to dynamically increase parallelism.
>>> 3) check on the trigger id received, we get the below error.
>>>
>>> 2020-01-31 10:35:44,211 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 2 @ 1580466943982 for job 7a085511514ba68ad07de1945dbf40a2.
>>> 2020-01-31 10:35:44,648 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting
>>> job 7a085511514ba68ad07de1945dbf40a2 from savepoint
>>> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
>>> ()
>>> 2020-01-31 10:35:45,191 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 2 for job 7a085511514ba68ad07de1945dbf40a2 (25449 bytes in 665
>>> ms).
>>> 2020-01-31 10:35:45,337 ERROR
>>> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
>>> occurred in REST handler: Job c6dd27ec36392e4af8fa55066e16a2e2 not found
>>> 2020-01-31 10:35:45,395 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Could not
>>> restore from temporary rescaling savepoint. This might indicate that the
>>> savepoint
>>> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
>>> got corrupted. Deleting this savepoint as a precaution.
>>> 2020-01-31 10:35:45,397 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Attempting
>>> to load configured state backend for savepoint disposal
>>> 2020-01-31 10:35:45,397 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster                  - No state
>>> backend configured, attempting to dispose savepoint with default backend
>>> (file system based)
>>> 2020-01-31 10:35:45,398 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job stuart
>>> (7a085511514ba68ad07de1945dbf40a2) switched from state CREATED to FAILING.
>>> org.apache.flink.runtime.execution.SuppressRestartsException:
>>> Unrecoverable failure. This suppresses job restarts. Please check the stack
>>> trace for the root cause.
>>>
>>> Attaching the file for reference. Job
>>> ID: 7a085511514ba68ad07de1945dbf40a2,
>>>
>>> Thanks.
>>>
>>>
>>>
>>> On Fri, Jan 31, 2020 at 3:20 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Ramya,
>>>>
>>>> could you share the logs with us?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Jan 31, 2020 at 9:31 AM Yun Tang <my...@live.com> wrote:
>>>>
>>>>> Hi Ramya
>>>>>
>>>>> Removed the dev mail list in receiver.
>>>>>
>>>>> Can you check the configuration of your "Job Manager" tab via web UI
>>>>> to see whether state.savepoints.dir [1] existed? If that existed, default
>>>>> savepoint directory is already given and such problem should not happen.
>>>>>
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#state-savepoints-dir
>>>>>
>>>>> Best
>>>>> Yun Tang
>>>>> ------------------------------
>>>>> *From:* Ramya Ramamurthy <ha...@gmail.com>
>>>>> *Sent:* Friday, January 31, 2020 15:03
>>>>> *To:* dev@flink.apache.org <de...@flink.apache.org>
>>>>> *Cc:* user <us...@flink.apache.org>
>>>>> *Subject:* Re: REST Monitoring Savepoint failed
>>>>>
>>>>> Hi Till,
>>>>>
>>>>> I am using flink 1.7.
>>>>> This is my observation.
>>>>>
>>>>> a) I first trigger a savepoint. this is stored on my Google cloud
>>>>> storage.
>>>>> b) When i invoke the rescale HTTP API, i get the error telling
>>>>> savepoints
>>>>> dir is not configured. But post triggering a), i could verify the
>>>>> savepoint
>>>>> directory present in GCS in the mentioned path.
>>>>>
>>>>> Below is the snapshot of my deployment file.
>>>>>
>>>>> Environment:
>>>>>       JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
>>>>>       HIGH_AVAILABILITY:                      zookeeper
>>>>>       HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
>>>>>       HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
>>>>>       HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
>>>>>       HIGH_AVAILABILITY_STORAGEDIR:
>>>>> gs://xxxxx/flink/flink-gcs/checkpoints
>>>>>       HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
>>>>>       STATE_CHECKPOINTS_DIR:
>>>>>  gs://xxxxx/flink/flink-gcs/flink-checkpoints
>>>>>       STATE_SAVEPOINTS_DIR:
>>>>> gs://xxxxx/flink/flink-gcs/flink-savepoints
>>>>>
>>>>> Response to my Savepoints REST API is as below:
>>>>> {
>>>>>     "status": {
>>>>>         "id": "COMPLETED"
>>>>>     },
>>>>>     "operation": {
>>>>>         "location":
>>>>>
>>>>> "gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
>>>>>     }
>>>>> }
>>>>>
>>>>> So why does the job doesnt recognize this savepoint directory ? Also,
>>>>> during this operation, i could see the Checkpoints directory for this
>>>>> job
>>>>> gets deleted. Post which, no checkpoints are happening. any thoughts
>>>>> here
>>>>> would really help us in progressing.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>> > Hi Ramya,
>>>>> >
>>>>> > I think this message is better suited for the user ML list. Which
>>>>> version
>>>>> > of Flink are you using? Have you checked the Flink logs to see
>>>>> whether they
>>>>> > contain anything suspicious?
>>>>> >
>>>>> > Cheers,
>>>>> > Till
>>>>> >
>>>>> > On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
>>>>> > wrote:
>>>>> >
>>>>> > > Hi,
>>>>> > >
>>>>> > > I am trying to dynamically increase the parallelism of the job. In
>>>>> the
>>>>> > > process of it, while I am trying to trigger the savepoint, i get
>>>>> > > the following error. Any help would be appreciated.
>>>>> > >
>>>>> > > The URL triggered is :
>>>>> > >
>>>>> > >
>>>>> >
>>>>> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
>>>>> > > {
>>>>> > >     "target-directory" :
>>>>> > > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
>>>>> > >     "cancel-job" : "false"
>>>>> > > }
>>>>> > >
>>>>> > > Error as below:
>>>>> > >
>>>>> > >
>>>>> > >
>>>>> >
>>>>> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
>>>>> > > java.util.concurrent.CompletionException:
>>>>> > > org.apache.flink.runtime.checkpoint.CheckpointTriggerException:
>>>>> Failed to
>>>>> > > trigger savepoint. Decline reason: An Exception occurred while
>>>>> triggering
>>>>> > > the checkpoint.\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
>>>>> > >
>>>>> > >
>>>>> >
>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
>>>>> > > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
>>>>> > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
>>>>> > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
>>>>> > > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
>>>>> > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat
>>>>> akka.dis
>>>>> > >
>>>>> >
>>>>>
>>>>

Re: REST Monitoring Savepoint failed

Posted by Ramya Ramamurthy <ha...@gmail.com>.
Thanks Till Rohrmann for the update.

So even if we upgrade to the newer version of Flink, we have to manually
rescale. Is that correct? that is to stop the job and start again with the
desired parallelism.

Thanks.

On Fri, Jan 31, 2020 at 6:42 PM Till Rohrmann <tr...@apache.org> wrote:

> Thanks for providing us with the logs Ramya. I think the problem is that
> with FLINK-10354 [1], we accidentally broke the rescaling feature in Flink
> >= 1.7.0. The problem is that before savepoints weren't used for recovery
> and, hence, they were not part of the CompletedCheckpointStore. With
> FLINK-10354, this changed and now the savepoints are part of the completed
> checkpoint store. This breaks an assumption of the rescaling feature.
>
> I would recommend to manually rescale by (1) taking a savepoint, (2)
> stopping the job, (3) resubmitting the job with the changed parallelism
> resuming from the taken savepoint.
>
> A side note, the rescaling feature has been removed in Flink >= 1.9.0
> because of some inherent limitations.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10354
>
> Cheers,
> Till
>
> On Fri, Jan 31, 2020 at 11:49 AM Ramya Ramamurthy <ha...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Please find the below steps.
>>
>> 1) Trigger a savepoint to GCS
>> 2) Trigger /rescaling REST API to dynamically increase parallelism.
>> 3) check on the trigger id received, we get the below error.
>>
>> 2020-01-31 10:35:44,211 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>> checkpoint 2 @ 1580466943982 for job 7a085511514ba68ad07de1945dbf40a2.
>> 2020-01-31 10:35:44,648 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting
>> job 7a085511514ba68ad07de1945dbf40a2 from savepoint
>> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
>> ()
>> 2020-01-31 10:35:45,191 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>> checkpoint 2 for job 7a085511514ba68ad07de1945dbf40a2 (25449 bytes in 665
>> ms).
>> 2020-01-31 10:35:45,337 ERROR
>> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
>> occurred in REST handler: Job c6dd27ec36392e4af8fa55066e16a2e2 not found
>> 2020-01-31 10:35:45,395 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Could not
>> restore from temporary rescaling savepoint. This might indicate that the
>> savepoint
>> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
>> got corrupted. Deleting this savepoint as a precaution.
>> 2020-01-31 10:35:45,397 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Attempting
>> to load configured state backend for savepoint disposal
>> 2020-01-31 10:35:45,397 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster                  - No state
>> backend configured, attempting to dispose savepoint with default backend
>> (file system based)
>> 2020-01-31 10:35:45,398 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job stuart
>> (7a085511514ba68ad07de1945dbf40a2) switched from state CREATED to FAILING.
>> org.apache.flink.runtime.execution.SuppressRestartsException:
>> Unrecoverable failure. This suppresses job restarts. Please check the stack
>> trace for the root cause.
>>
>> Attaching the file for reference. Job
>> ID: 7a085511514ba68ad07de1945dbf40a2,
>>
>> Thanks.
>>
>>
>>
>> On Fri, Jan 31, 2020 at 3:20 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Ramya,
>>>
>>> could you share the logs with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jan 31, 2020 at 9:31 AM Yun Tang <my...@live.com> wrote:
>>>
>>>> Hi Ramya
>>>>
>>>> Removed the dev mail list in receiver.
>>>>
>>>> Can you check the configuration of your "Job Manager" tab via web UI to
>>>> see whether state.savepoints.dir [1] existed? If that existed, default
>>>> savepoint directory is already given and such problem should not happen.
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#state-savepoints-dir
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Ramya Ramamurthy <ha...@gmail.com>
>>>> *Sent:* Friday, January 31, 2020 15:03
>>>> *To:* dev@flink.apache.org <de...@flink.apache.org>
>>>> *Cc:* user <us...@flink.apache.org>
>>>> *Subject:* Re: REST Monitoring Savepoint failed
>>>>
>>>> Hi Till,
>>>>
>>>> I am using flink 1.7.
>>>> This is my observation.
>>>>
>>>> a) I first trigger a savepoint. this is stored on my Google cloud
>>>> storage.
>>>> b) When i invoke the rescale HTTP API, i get the error telling
>>>> savepoints
>>>> dir is not configured. But post triggering a), i could verify the
>>>> savepoint
>>>> directory present in GCS in the mentioned path.
>>>>
>>>> Below is the snapshot of my deployment file.
>>>>
>>>> Environment:
>>>>       JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
>>>>       HIGH_AVAILABILITY:                      zookeeper
>>>>       HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
>>>>       HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
>>>>       HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
>>>>       HIGH_AVAILABILITY_STORAGEDIR:
>>>> gs://xxxxx/flink/flink-gcs/checkpoints
>>>>       HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
>>>>       STATE_CHECKPOINTS_DIR:
>>>>  gs://xxxxx/flink/flink-gcs/flink-checkpoints
>>>>       STATE_SAVEPOINTS_DIR:
>>>> gs://xxxxx/flink/flink-gcs/flink-savepoints
>>>>
>>>> Response to my Savepoints REST API is as below:
>>>> {
>>>>     "status": {
>>>>         "id": "COMPLETED"
>>>>     },
>>>>     "operation": {
>>>>         "location":
>>>>
>>>> "gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
>>>>     }
>>>> }
>>>>
>>>> So why does the job doesnt recognize this savepoint directory ? Also,
>>>> during this operation, i could see the Checkpoints directory for this
>>>> job
>>>> gets deleted. Post which, no checkpoints are happening. any thoughts
>>>> here
>>>> would really help us in progressing.
>>>>
>>>> Thanks,
>>>>
>>>> On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>> > Hi Ramya,
>>>> >
>>>> > I think this message is better suited for the user ML list. Which
>>>> version
>>>> > of Flink are you using? Have you checked the Flink logs to see
>>>> whether they
>>>> > contain anything suspicious?
>>>> >
>>>> > Cheers,
>>>> > Till
>>>> >
>>>> > On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
>>>> > wrote:
>>>> >
>>>> > > Hi,
>>>> > >
>>>> > > I am trying to dynamically increase the parallelism of the job. In
>>>> the
>>>> > > process of it, while I am trying to trigger the savepoint, i get
>>>> > > the following error. Any help would be appreciated.
>>>> > >
>>>> > > The URL triggered is :
>>>> > >
>>>> > >
>>>> >
>>>> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
>>>> > > {
>>>> > >     "target-directory" :
>>>> > > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
>>>> > >     "cancel-job" : "false"
>>>> > > }
>>>> > >
>>>> > > Error as below:
>>>> > >
>>>> > >
>>>> > >
>>>> >
>>>> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
>>>> > > java.util.concurrent.CompletionException:
>>>> > > org.apache.flink.runtime.checkpoint.CheckpointTriggerException:
>>>> Failed to
>>>> > > trigger savepoint. Decline reason: An Exception occurred while
>>>> triggering
>>>> > > the checkpoint.\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
>>>> > >
>>>> > >
>>>> >
>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
>>>> > > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
>>>> > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
>>>> > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
>>>> > > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
>>>> > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat
>>>> akka.dis
>>>> > >
>>>> >
>>>>
>>>

Re: REST Monitoring Savepoint failed

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for providing us with the logs Ramya. I think the problem is that
with FLINK-10354 [1], we accidentally broke the rescaling feature in Flink
>= 1.7.0. The problem is that before savepoints weren't used for recovery
and, hence, they were not part of the CompletedCheckpointStore. With
FLINK-10354, this changed and now the savepoints are part of the completed
checkpoint store. This breaks an assumption of the rescaling feature.

I would recommend to manually rescale by (1) taking a savepoint, (2)
stopping the job, (3) resubmitting the job with the changed parallelism
resuming from the taken savepoint.

A side note, the rescaling feature has been removed in Flink >= 1.9.0
because of some inherent limitations.

[1] https://issues.apache.org/jira/browse/FLINK-10354

Cheers,
Till

On Fri, Jan 31, 2020 at 11:49 AM Ramya Ramamurthy <ha...@gmail.com> wrote:

> Hi,
>
> Please find the below steps.
>
> 1) Trigger a savepoint to GCS
> 2) Trigger /rescaling REST API to dynamically increase parallelism.
> 3) check on the trigger id received, we get the below error.
>
> 2020-01-31 10:35:44,211 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 2 @ 1580466943982 for job 7a085511514ba68ad07de1945dbf40a2.
> 2020-01-31 10:35:44,648 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting
> job 7a085511514ba68ad07de1945dbf40a2 from savepoint
> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
> ()
> 2020-01-31 10:35:45,191 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 2 for job 7a085511514ba68ad07de1945dbf40a2 (25449 bytes in 665
> ms).
> 2020-01-31 10:35:45,337 ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
> occurred in REST handler: Job c6dd27ec36392e4af8fa55066e16a2e2 not found
> 2020-01-31 10:35:45,395 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                  - Could not restore from temporary rescaling savepoint.
> This might indicate that the savepoint
> gs://ss-enigma-bucket/flink/flink-gcs/flink-savepoints/savepoint-7a0855-de3002bc066b
> got corrupted. Deleting this savepoint as a precaution.
> 2020-01-31 10:35:45,397 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                  - Attempting to load configured state backend for
> savepoint disposal
> 2020-01-31 10:35:45,397 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                  - No state backend configured, attempting to dispose
> savepoint with default backend (file system based)
> 2020-01-31 10:35:45,398 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job stuart
> (7a085511514ba68ad07de1945dbf40a2) switched from state CREATED to FAILING.
> org.apache.flink.runtime.execution.SuppressRestartsException:
> Unrecoverable failure. This suppresses job restarts. Please check the stack
> trace for the root cause.
>
> Attaching the file for reference. Job ID: 7a085511514ba68ad07de1945dbf40a2,
>
> Thanks.
>
>
>
> On Fri, Jan 31, 2020 at 3:20 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Ramya,
>>
>> could you share the logs with us?
>>
>> Cheers,
>> Till
>>
>> On Fri, Jan 31, 2020 at 9:31 AM Yun Tang <my...@live.com> wrote:
>>
>>> Hi Ramya
>>>
>>> Removed the dev mail list in receiver.
>>>
>>> Can you check the configuration of your "Job Manager" tab via web UI to
>>> see whether state.savepoints.dir [1] existed? If that existed, default
>>> savepoint directory is already given and such problem should not happen.
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#state-savepoints-dir
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Ramya Ramamurthy <ha...@gmail.com>
>>> *Sent:* Friday, January 31, 2020 15:03
>>> *To:* dev@flink.apache.org <de...@flink.apache.org>
>>> *Cc:* user <us...@flink.apache.org>
>>> *Subject:* Re: REST Monitoring Savepoint failed
>>>
>>> Hi Till,
>>>
>>> I am using flink 1.7.
>>> This is my observation.
>>>
>>> a) I first trigger a savepoint. this is stored on my Google cloud
>>> storage.
>>> b) When i invoke the rescale HTTP API, i get the error telling savepoints
>>> dir is not configured. But post triggering a), i could verify the
>>> savepoint
>>> directory present in GCS in the mentioned path.
>>>
>>> Below is the snapshot of my deployment file.
>>>
>>> Environment:
>>>       JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
>>>       HIGH_AVAILABILITY:                      zookeeper
>>>       HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
>>>       HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
>>>       HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
>>>       HIGH_AVAILABILITY_STORAGEDIR:
>>> gs://xxxxx/flink/flink-gcs/checkpoints
>>>       HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
>>>       STATE_CHECKPOINTS_DIR:
>>>  gs://xxxxx/flink/flink-gcs/flink-checkpoints
>>>       STATE_SAVEPOINTS_DIR:
>>> gs://xxxxx/flink/flink-gcs/flink-savepoints
>>>
>>> Response to my Savepoints REST API is as below:
>>> {
>>>     "status": {
>>>         "id": "COMPLETED"
>>>     },
>>>     "operation": {
>>>         "location":
>>>
>>> "gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
>>>     }
>>> }
>>>
>>> So why does the job doesnt recognize this savepoint directory ? Also,
>>> during this operation, i could see the Checkpoints directory for this job
>>> gets deleted. Post which, no checkpoints are happening. any thoughts here
>>> would really help us in progressing.
>>>
>>> Thanks,
>>>
>>> On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>> > Hi Ramya,
>>> >
>>> > I think this message is better suited for the user ML list. Which
>>> version
>>> > of Flink are you using? Have you checked the Flink logs to see whether
>>> they
>>> > contain anything suspicious?
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > I am trying to dynamically increase the parallelism of the job. In
>>> the
>>> > > process of it, while I am trying to trigger the savepoint, i get
>>> > > the following error. Any help would be appreciated.
>>> > >
>>> > > The URL triggered is :
>>> > >
>>> > >
>>> >
>>> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
>>> > > {
>>> > >     "target-directory" :
>>> > > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
>>> > >     "cancel-job" : "false"
>>> > > }
>>> > >
>>> > > Error as below:
>>> > >
>>> > >
>>> > >
>>> >
>>> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
>>> > > java.util.concurrent.CompletionException:
>>> > > org.apache.flink.runtime.checkpoint.CheckpointTriggerException:
>>> Failed to
>>> > > trigger savepoint. Decline reason: An Exception occurred while
>>> triggering
>>> > > the checkpoint.\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
>>> > >
>>> > >
>>> >
>>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>>> > >
>>> > >
>>> >
>>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>>> > >
>>> > >
>>> >
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
>>> > >
>>> > >
>>> >
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
>>> > >
>>> > >
>>> >
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
>>> > > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
>>> > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
>>> > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
>>> > > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
>>> > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat
>>> akka.dis
>>> > >
>>> >
>>>
>>

Re: REST Monitoring Savepoint failed

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ramya,

could you share the logs with us?

Cheers,
Till

On Fri, Jan 31, 2020 at 9:31 AM Yun Tang <my...@live.com> wrote:

> Hi Ramya
>
> Removed the dev mail list in receiver.
>
> Can you check the configuration of your "Job Manager" tab via web UI to
> see whether state.savepoints.dir [1] existed? If that existed, default
> savepoint directory is already given and such problem should not happen.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#state-savepoints-dir
>
> Best
> Yun Tang
> ------------------------------
> *From:* Ramya Ramamurthy <ha...@gmail.com>
> *Sent:* Friday, January 31, 2020 15:03
> *To:* dev@flink.apache.org <de...@flink.apache.org>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: REST Monitoring Savepoint failed
>
> Hi Till,
>
> I am using flink 1.7.
> This is my observation.
>
> a) I first trigger a savepoint. this is stored on my Google cloud storage.
> b) When i invoke the rescale HTTP API, i get the error telling savepoints
> dir is not configured. But post triggering a), i could verify the savepoint
> directory present in GCS in the mentioned path.
>
> Below is the snapshot of my deployment file.
>
> Environment:
>       JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
>       HIGH_AVAILABILITY:                      zookeeper
>       HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
>       HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
>       HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
>       HIGH_AVAILABILITY_STORAGEDIR:
> gs://xxxxx/flink/flink-gcs/checkpoints
>       HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
>       STATE_CHECKPOINTS_DIR:
>  gs://xxxxx/flink/flink-gcs/flink-checkpoints
>       STATE_SAVEPOINTS_DIR:
> gs://xxxxx/flink/flink-gcs/flink-savepoints
>
> Response to my Savepoints REST API is as below:
> {
>     "status": {
>         "id": "COMPLETED"
>     },
>     "operation": {
>         "location":
> "gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
>     }
> }
>
> So why does the job doesnt recognize this savepoint directory ? Also,
> during this operation, i could see the Checkpoints directory for this job
> gets deleted. Post which, no checkpoints are happening. any thoughts here
> would really help us in progressing.
>
> Thanks,
>
> On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi Ramya,
> >
> > I think this message is better suited for the user ML list. Which version
> > of Flink are you using? Have you checked the Flink logs to see whether
> they
> > contain anything suspicious?
> >
> > Cheers,
> > Till
> >
> > On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I am trying to dynamically increase the parallelism of the job. In the
> > > process of it, while I am trying to trigger the savepoint, i get
> > > the following error. Any help would be appreciated.
> > >
> > > The URL triggered is :
> > >
> > >
> >
> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
> > > {
> > >     "target-directory" :
> > > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
> > >     "cancel-job" : "false"
> > > }
> > >
> > > Error as below:
> > >
> > >
> > >
> >
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> > > java.util.concurrent.CompletionException:
> > > org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed
> to
> > > trigger savepoint. Decline reason: An Exception occurred while
> triggering
> > > the checkpoint.\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
> > >
> > >
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> > >
> > >
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> > >
> > >
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
> > >
> > >
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
> > > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
> > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
> > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
> > > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
> > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat akka.dis
> > >
> >
>

Re: REST Monitoring Savepoint failed

Posted by Yun Tang <my...@live.com>.
Hi Ramya

Removed the dev mail list in receiver.

Can you check the configuration of your "Job Manager" tab via web UI to see whether state.savepoints.dir [1] existed? If that existed, default savepoint directory is already given and such problem should not happen.

[cid:fc2f6504-ada9-46fa-a0ff-a50f8aee7b31]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#state-savepoints-dir

Best
Yun Tang
________________________________
From: Ramya Ramamurthy <ha...@gmail.com>
Sent: Friday, January 31, 2020 15:03
To: dev@flink.apache.org <de...@flink.apache.org>
Cc: user <us...@flink.apache.org>
Subject: Re: REST Monitoring Savepoint failed

Hi Till,

I am using flink 1.7.
This is my observation.

a) I first trigger a savepoint. this is stored on my Google cloud storage.
b) When i invoke the rescale HTTP API, i get the error telling savepoints
dir is not configured. But post triggering a), i could verify the savepoint
directory present in GCS in the mentioned path.

Below is the snapshot of my deployment file.

Environment:
      JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
      HIGH_AVAILABILITY:                      zookeeper
      HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
      HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
      HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
      HIGH_AVAILABILITY_STORAGEDIR:
gs://xxxxx/flink/flink-gcs/checkpoints
      HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
      STATE_CHECKPOINTS_DIR:
 gs://xxxxx/flink/flink-gcs/flink-checkpoints
      STATE_SAVEPOINTS_DIR:
gs://xxxxx/flink/flink-gcs/flink-savepoints

Response to my Savepoints REST API is as below:
{
    "status": {
        "id": "COMPLETED"
    },
    "operation": {
        "location":
"gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
    }
}

So why does the job doesnt recognize this savepoint directory ? Also,
during this operation, i could see the Checkpoints directory for this job
gets deleted. Post which, no checkpoints are happening. any thoughts here
would really help us in progressing.

Thanks,

On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Ramya,
>
> I think this message is better suited for the user ML list. Which version
> of Flink are you using? Have you checked the Flink logs to see whether they
> contain anything suspicious?
>
> Cheers,
> Till
>
> On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am trying to dynamically increase the parallelism of the job. In the
> > process of it, while I am trying to trigger the savepoint, i get
> > the following error. Any help would be appreciated.
> >
> > The URL triggered is :
> >
> >
> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
> > {
> >     "target-directory" :
> > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
> >     "cancel-job" : "false"
> > }
> >
> > Error as below:
> >
> >
> >
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> > java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
> > trigger savepoint. Decline reason: An Exception occurred while triggering
> > the checkpoint.\n\tat
> >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
> >
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat akka.dis
> >
>

Re: REST Monitoring Savepoint failed

Posted by Ramya Ramamurthy <ha...@gmail.com>.
Hi Till,

I am using flink 1.7.
This is my observation.

a) I first trigger a savepoint. this is stored on my Google cloud storage.
b) When i invoke the rescale HTTP API, i get the error telling savepoints
dir is not configured. But post triggering a), i could verify the savepoint
directory present in GCS in the mentioned path.

Below is the snapshot of my deployment file.

Environment:
      JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
      HIGH_AVAILABILITY:                      zookeeper
      HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
      HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
      HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
      HIGH_AVAILABILITY_STORAGEDIR:
gs://xxxxx/flink/flink-gcs/checkpoints
      HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
      STATE_CHECKPOINTS_DIR:
 gs://xxxxx/flink/flink-gcs/flink-checkpoints
      STATE_SAVEPOINTS_DIR:
gs://xxxxx/flink/flink-gcs/flink-savepoints

Response to my Savepoints REST API is as below:
{
    "status": {
        "id": "COMPLETED"
    },
    "operation": {
        "location":
"gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
    }
}

So why does the job doesnt recognize this savepoint directory ? Also,
during this operation, i could see the Checkpoints directory for this job
gets deleted. Post which, no checkpoints are happening. any thoughts here
would really help us in progressing.

Thanks,

On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Ramya,
>
> I think this message is better suited for the user ML list. Which version
> of Flink are you using? Have you checked the Flink logs to see whether they
> contain anything suspicious?
>
> Cheers,
> Till
>
> On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am trying to dynamically increase the parallelism of the job. In the
> > process of it, while I am trying to trigger the savepoint, i get
> > the following error. Any help would be appreciated.
> >
> > The URL triggered is :
> >
> >
> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
> > {
> >     "target-directory" :
> > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
> >     "cancel-job" : "false"
> > }
> >
> > Error as below:
> >
> >
> >
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> > java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
> > trigger savepoint. Decline reason: An Exception occurred while triggering
> > the checkpoint.\n\tat
> >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
> >
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat akka.dis
> >
>

Re: REST Monitoring Savepoint failed

Posted by Ramya Ramamurthy <ha...@gmail.com>.
Hi Till,

I am using flink 1.7.
This is my observation.

a) I first trigger a savepoint. this is stored on my Google cloud storage.
b) When i invoke the rescale HTTP API, i get the error telling savepoints
dir is not configured. But post triggering a), i could verify the savepoint
directory present in GCS in the mentioned path.

Below is the snapshot of my deployment file.

Environment:
      JOB_MANAGER_RPC_ADDRESS:                svc-flink-jobmanager-gcs
      HIGH_AVAILABILITY:                      zookeeper
      HIGH_AVAILABILITY_ZOOKEEPER:            zookeeper-dev-1:2181
      HIGH_AVAILABILITY_ZOOKEEPER_PATH_ROOT:  /flink-gcs-chk
      HIGH_AVAILABILITY_CLUSTER_ID:           fs.default_ns
      HIGH_AVAILABILITY_STORAGEDIR:
gs://xxxxx/flink/flink-gcs/checkpoints
      HIGH_AVAILABILITY_JOBMANAGER_PORT:      6123
      STATE_CHECKPOINTS_DIR:
 gs://xxxxx/flink/flink-gcs/flink-checkpoints
      STATE_SAVEPOINTS_DIR:
gs://xxxxx/flink/flink-gcs/flink-savepoints

Response to my Savepoints REST API is as below:
{
    "status": {
        "id": "COMPLETED"
    },
    "operation": {
        "location":
"gs://xxxxx/flink/flink-gcs/flink-savepoints/savepoint-d02217-ec0aced1fe9c"
    }
}

So why does the job doesnt recognize this savepoint directory ? Also,
during this operation, i could see the Checkpoints directory for this job
gets deleted. Post which, no checkpoints are happening. any thoughts here
would really help us in progressing.

Thanks,

On Thu, Jan 30, 2020 at 8:45 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Ramya,
>
> I think this message is better suited for the user ML list. Which version
> of Flink are you using? Have you checked the Flink logs to see whether they
> contain anything suspicious?
>
> Cheers,
> Till
>
> On Thu, Jan 30, 2020 at 1:09 PM Ramya Ramamurthy <ha...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am trying to dynamically increase the parallelism of the job. In the
> > process of it, while I am trying to trigger the savepoint, i get
> > the following error. Any help would be appreciated.
> >
> > The URL triggered is :
> >
> >
> http://stgflink.ssdev.in:8081/jobs/2865c1f40340bcb19a88a01d6ef8ff4f/savepoints/
> > {
> >     "target-directory" :
> > "gs://xxxx-bucket/flink/flink-gcs/flink-savepoints",
> >     "cancel-job" : "false"
> > }
> >
> > Error as below:
> >
> >
> >
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> > java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to
> > trigger savepoint. Decline reason: An Exception occurred while triggering
> > the checkpoint.\n\tat
> >
> >
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:972)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> >
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)\n\tat
> >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)\n\tat
> >
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)\n\tat
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)\n\tat
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)\n\tat
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)\n\tat
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)\n\tat
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)\n\tat akka.dis
> >
>