You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/01/10 15:29:58 UTC

[jira] [Commented] (FLINK-5407) Savepoint for iterative Task fails.

    [ https://issues.apache.org/jira/browse/FLINK-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15815285#comment-15815285 ] 

ASF GitHub Bot commented on FLINK-5407:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/3088

    [FLINK-5407] Fix savepoints for iterative jobs

    This PR fixes savepoints for iterative jobs. Savepoints failed with NPE because the code assumed that operators in an operator chain are never null. For iterative jobs, this can happen.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink NPE-Iterative-Snapshot

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3088
    
----
commit 984d596c063b5082520d8d58baa6b7361b1e9921
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-01-05T13:28:50Z

    [FLINK-5407] Handle snapshoting null-operator in chain

commit c96fe7ba35764b4f9e05ed61199b2027981daa54
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-01-10T15:08:06Z

    [FLINK-5407] IT case for savepoint with iterative job

----


> Savepoint for iterative Task fails.
> -----------------------------------
>
>                 Key: FLINK-5407
>                 URL: https://issues.apache.org/jira/browse/FLINK-5407
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stefan Richter
>             Fix For: 1.2.0, 1.3.0
>
>         Attachments: SavepointBug.java
>
>
> Flink 1.2-SNAPSHOT (Commit: 5b54009) on Windows.
> Triggering a savepoint for a streaming job, both the savepoint and the job failed.
> The job failed with the following exception:
> {code}
> java.lang.RuntimeException: Error while triggering checkpoint for IterationSource-7 (1/1)
> 	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1026)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> 	at java.util.concurrent.FutureTask.run(Unknown Source)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorIdentifier(StreamTask.java:767)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.access$500(StreamTask.java:115)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:986)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:956)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:583)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:551)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:511)
> 	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1019)
> 	... 5 more
> And the savepoint failed with the following exception:
> Using address /127.0.0.1:6123 to connect to JobManager.
> Triggering savepoint for job 153310c4a836a92ce69151757c6b73f1.
> Waiting for response...
> ------------------------------------------------------------
>  The program finished with the following exception:
> java.lang.Exception: Failed to complete savepoint
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:793)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:782)
>         at org.apache.flink.runtime.concurrent.impl.FlinkFuture$6.recover(FlinkFuture.java:263)
>         at akka.dispatch.Recover.internal(Future.scala:267)
>         at akka.dispatch.japi$RecoverBridge.apply(Future.scala:183)
>         at akka.dispatch.japi$RecoverBridge.apply(Future.scala:181)
>         at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
>         at scala.util.Try$.apply(Try.scala:161)
>         at scala.util.Failure.recover(Try.scala:185)
>         at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>         at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>         at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>         at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>         at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Checkpoint failed: Checkpoint Coordinator is shutting down
>         at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:338)
>         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.shutdown(CheckpointCoordinator.java:245)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.postRunCleanup(ExecutionGraph.java:1065)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.jobVertexInFinalState(ExecutionGraph.java:1034)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.subtaskInFinalState(ExecutionJobVertex.java:435)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.vertexCancelled(ExecutionJobVertex.java:407)
>         at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionCanceled(ExecutionVertex.java:593)
>         at org.apache.flink.runtime.executiongraph.Execution.cancelingComplete(Execution.java:729)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1105)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:687)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:686)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:686)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         ... 2 more
> Caused by: java.lang.Exception: Checkpoint Coordinator is shutting down
>         ... 20 more
> {code}
> Maybe worth mentionning : the iteration body contains MapFunction and its thread was in a sleep state (put manually) during the savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)