You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2018/10/10 07:23:22 UTC

Error restoring from savepoint while there's no modification to the job

Hi everyone,

I'm getting the following error when trying to restore from a savepoint.
Here below is the output from flink bin, and in the attachment is a TM log.
I didn't have any change in the app before and after savepoint. All Window
operators have been assigned unique ID string.

Could you please help give a look?

Thanks and best regards,
Averell

taskmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>  

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 606ad5239f5e23cedb85d3e75bf76463)
	at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
	at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
	at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
	at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
	at
com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
	at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
	at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
	at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
	at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
	at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
	... 22 more
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
from any of the 1 provided restore options.
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
	... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Le-Van Huyen <lv...@gmail.com>.
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from master about 5 days back. Savepoint
was saved to either S3 or HDFS (I tried multiple times), and had not been
moved.

Is there any kind of improper user code can cause such error?

Thanks and best regards,
Averell

On Wed, Oct 10, 2018 at 7:02 PM Stefan Richter <s....@data-artisans.com>
wrote:

> Hi,
>
> adding to Dawids questions, it would also be very helpful to know which
> Flink version was used to create the savepoint, which Flink version was
> used in the restore attempt, if the savepoint was moved or modified.
> Outside of potential conflicts with those things, I would not expect
> anything like this.
>
> Best,
> Stefan
>
> > On 10. Oct 2018, at 09:51, Dawid Wysakowicz <dw...@apache.org>
> wrote:
> >
> > Hi Averell,
> >
> > Do you try to scale the job up, meaning do you increase the job
> > parallelism? Have you increased the job max parallelism by chance? If so
> > this is not supported. The max parallelism parameter is used to create
> > key groups that can be further assigned to parallel operators. This
> > parameter cannot be changed for a job that shall be restored.
> >
> > If this is not the case, maybe Stefan(cc) have some ideas, what can go
> > wrong.
> >
> > Best,
> >
> > Dawid
> >
> >
> > On 10/10/18 09:23, Averell wrote:
> >> Hi everyone,
> >>
> >> I'm getting the following error when trying to restore from a savepoint.
> >> Here below is the output from flink bin, and in the attachment is a TM
> log.
> >> I didn't have any change in the app before and after savepoint. All
> Window
> >> operators have been assigned unique ID string.
> >>
> >> Could you please help give a look?
> >>
> >> Thanks and best regards,
> >> Averell
> >>
> >> taskmanager.gz
> >> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>
>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> >> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
> >>      at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> >>      at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> >>      at
> >>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> >>      at
> >>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
> >>      at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
> >>      at
> >>
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
> >>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>      at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>      at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>      at java.lang.reflect.Method.invoke(Method.java:498)
> >>      at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> >>      at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >>      at
> >>
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> >>      at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> >>      at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> >>      at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >>      at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> >>      at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> >>      at java.security.AccessController.doPrivileged(Native Method)
> >>      at javax.security.auth.Subject.doAs(Subject.java:422)
> >>      at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> >>      at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>      at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >> execution failed.
> >>      at
> >>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> >>      at
> >>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> >>      ... 22 more
> >> Caused by: java.lang.Exception: Exception while creating
> >> StreamOperatorStateContext.
> >>      at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> >>      at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> >>      at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> >>      at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> >>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> >>      at java.lang.Thread.run(Thread.java:748)
> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> >> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
> >> from any of the 1 provided restore options.
> >>      at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> >>      at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
> >>      at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
> >>      ... 5 more
> >> Caused by: java.lang.IllegalStateException: Unexpected key-group in
> restore.
> >>      at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>      at
> >>
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
> >>      at
> >>
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
> >>      at
> >>
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
> >>      at
> >>
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
> >>      at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> >>      at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> >>
> >>
> >>
> >> --
> >> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
> >
>
>

Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Thank you Stefan, I'll try to follow your guide to debug.

And sorry for being confusing in the previous email. When I said "different
builds", I meant different versions of my application, not different builds
of Flink. 

Between versions of my application, I do add/remove some operators. However,
as I mentioned from the 1st email, I got errors when restoring savepoint
created by the same version of my application.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I see, then the important question for me is if the problem exists on the release/master code or just on your branches. Of course we can hardly give any advice for custom builds and without any code. In general, you should debug in HeapKeyedStateBackend lines lines 774-776 (the write part) and check against 472-474 (the read part). What happens there is very straight forward: remember the offset of the output stream, write the key-group. The read part the seeks to the remembered offset and reads the key-group. They must match.

Best,
Stefan

> On 15. Oct 2018, at 11:35, Averell <lv...@gmail.com> wrote:
> 
> Hi Kostas, Stefan,
> 
> The problem doesn't come on all of my builds, so it is a little bit
> difficult to track. Are there any specific classes that I can turn DEBUG on
> to help in finding the problem? (Turning DEBUG on globally seems too much).
> Will try to minimize the code and post it.
> 
> One more point that I notice is the error doesn't stay on one single
> operator but changes from time to time (even within the same build). For
> example, the previous exception I quoted was with a Window operator, while
> the one below is with CoStreamFlatMap.
> 
> Thanks and best regards,
> Averell
> 
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
> from any of the 1 provided restore options.
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
> 	... 5 more
> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> 	... 7 more
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Hi Kostas, Stefan,

The problem doesn't come on all of my builds, so it is a little bit
difficult to track. Are there any specific classes that I can turn DEBUG on
to help in finding the problem? (Turning DEBUG on globally seems too much).
Will try to minimize the code and post it.

One more point that I notice is the error doesn't stay on one single
operator but changes from time to time (even within the same build). For
example, the previous exception I quoted was with a Window operator, while
the one below is with CoStreamFlatMap.

Thanks and best regards,
Averell

Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64)
from any of the 1 provided restore options.
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
	... 5 more
Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
	at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	... 7 more






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think it is rather unlikely that this is the problem because it should  give a different kind of exception. Would it be possible to provide a minimal and self-contained example code for a problematic job?

Best,
Stefan

> On 15. Oct 2018, at 08:29, Averell <lv...@gmail.com> wrote:
> 
> Hi everyone,
> 
> In the StreamExecutionEnvironment.createFileInput method, a file source is
> created as following:
> 		/SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction,
> sourceName)
> 				.*transform*("Split Reader: " + sourceName, typeInfo, reader);/
> 
> Does this create two different operators? If yes, then it seems impossible
> to assign a UID to the 1st operator. And might it be the cause for my
> problem?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

This could be the root cause of your problem!
Thanks for digging into it.

Would it be possible for you to verify that this is your problem by manually setting 
the UUID and seeing if the problem disappears? In addition, please file a JIRA.

Thanks a lot,
Kostas
 
> On Oct 15, 2018, at 8:29 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi everyone,
> 
> In the StreamExecutionEnvironment.createFileInput method, a file source is
> created as following:
> 		/SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction,
> sourceName)
> 				.*transform*("Split Reader: " + sourceName, typeInfo, reader);/
> 
> Does this create two different operators? If yes, then it seems impossible
> to assign a UID to the 1st operator. And might it be the cause for my
> problem?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Hi everyone,

In the StreamExecutionEnvironment.createFileInput method, a file source is
created as following:
		/SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction,
sourceName)
				.*transform*("Split Reader: " + sourceName, typeInfo, reader);/

Does this create two different operators? If yes, then it seems impossible
to assign a UID to the 1st operator. And might it be the cause for my
problem?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

No, the same code was used.
I (1) started the job, (2) created a savepoint, (3) cancelled the job, (4)
restored the job with the same command as in (1) with the addition "-s
<savepoint_path>".

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Kostas Kloudas <k....@data-artisans.com>.
You restore your job with the custom source from a savepoint taken without the custom source?


> On Oct 10, 2018, at 11:34 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Yes, I modified ContinuousFileMonitoringFunction to add one more
> ListState<Long>. The error might/should have come from that, but I haven't
> been able to find out why.
> 
> All of my keyed streams are defined by Scala tuples like: /ikeyBy(r =>
> (r.customer_id, r.address))/, and the fields using as keys are of types
> either String or Long. For this, I don't have to define equals and hashcode
> method, do I?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

Yes, I modified ContinuousFileMonitoringFunction to add one more
ListState<Long>. The error might/should have come from that, but I haven't
been able to find out why.

All of my keyed streams are defined by Scala tuples like: /ikeyBy(r =>
(r.customer_id, r.address))/, and the fields using as keys are of types
either String or Long. For this, I don't have to define equals and hashcode
method, do I?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

In the logs there are some “Split Reader: Custom File Source:” 
This is a custom source you implemented?
Also is your keySelector deterministic with proper equals and hashcode methods? 

Cheers,
Kostas

> On Oct 10, 2018, at 10:50 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi Stefan, Dawid,
> 
> I hadn't changed anything in the configuration. Env's parallelism stayed at
> 64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
> 1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back.
> Savepoint was saved to either S3 or HDFS (I tried multiple times), and had
> not been moved.
> 
> Is there any kind of improper user code can cause such error?
> 
> Thanks for your support.
> 
> Best regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error restoring from savepoint while there's no modification to the job

Posted by Averell <lv...@gmail.com>.
Hi Stefan, Dawid,

I hadn't changed anything in the configuration. Env's parallelism stayed at
64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink
1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back.
Savepoint was saved to either S3 or HDFS (I tried multiple times), and had
not been moved.

Is there any kind of improper user code can cause such error?

Thanks for your support.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error restoring from savepoint while there's no modification to the job

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

adding to Dawids questions, it would also be very helpful to know which Flink version was used to create the savepoint, which Flink version was used in the restore attempt, if the savepoint was moved or modified. Outside of potential conflicts with those things, I would not expect anything like this.

Best,
Stefan

> On 10. Oct 2018, at 09:51, Dawid Wysakowicz <dw...@apache.org> wrote:
> 
> Hi Averell,
> 
> Do you try to scale the job up, meaning do you increase the job
> parallelism? Have you increased the job max parallelism by chance? If so
> this is not supported. The max parallelism parameter is used to create
> key groups that can be further assigned to parallel operators. This
> parameter cannot be changed for a job that shall be restored.
> 
> If this is not the case, maybe Stefan(cc) have some ideas, what can go
> wrong.
> 
> Best,
> 
> Dawid
> 
> 
> On 10/10/18 09:23, Averell wrote:
>> Hi everyone,
>> 
>> I'm getting the following error when trying to restore from a savepoint.
>> Here below is the output from flink bin, and in the attachment is a TM log.
>> I didn't have any change in the app before and after savepoint. All Window
>> operators have been assigned unique ID string.
>> 
>> Could you please help give a look?
>> 
>> Thanks and best regards,
>> Averell
>> 
>> taskmanager.gz
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>  
>> 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
>> 	at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>> 	at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>> 	at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>> 	at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
>> 	at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
>> 	at
>> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> 	at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> 	at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>> 	at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> 	at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>> 	at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> 	at java.security.AccessController.doPrivileged(Native Method)
>> 	at javax.security.auth.Subject.doAs(Subject.java:422)
>> 	at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>> 	at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> 	at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> 	at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>> 	... 22 more
>> Caused by: java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> 	at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>> 	at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>> 	at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> 	at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
>> from any of the 1 provided restore options.
>> 	at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> 	at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>> 	at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>> 	... 5 more
>> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> 	at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
>> 	at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
>> 	at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
>> 	at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
>> 	at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>> 	at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 
> 


Re: Error restoring from savepoint while there's no modification to the job

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Averell,

Do you try to scale the job up, meaning do you increase the job
parallelism? Have you increased the job max parallelism by chance? If so
this is not supported. The max parallelism parameter is used to create
key groups that can be further assigned to parallel operators. This
parameter cannot be changed for a job that shall be restored.

If this is not the case, maybe Stefan(cc) have some ideas, what can go
wrong.

Best,

Dawid


On 10/10/18 09:23, Averell wrote:
> Hi everyone,
>
> I'm getting the following error when trying to restore from a savepoint.
> Here below is the output from flink bin, and in the attachment is a TM log.
> I didn't have any change in the app before and after savepoint. All Window
> operators have been assigned unique ID string.
>
> Could you please help give a look?
>
> Thanks and best regards,
> Averell
>
> taskmanager.gz
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz>  
>
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 606ad5239f5e23cedb85d3e75bf76463)
> 	at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> 	at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> 	at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> 	at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
> 	at
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442)
> 	at
> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> 	at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> 	at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> 	at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> 	at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> 	at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> 	at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> 	at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> 	at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> 	... 22 more
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> 	at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64)
> from any of the 1 provided restore options.
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
> 	at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
> 	... 5 more
> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377)
> 	at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> 	at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/