You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/22 10:06:00 UTC

[jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable

    [ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409316#comment-16409316 ] 

ASF GitHub Bot commented on FLINK-8943:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/5746

    [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store

    ## What is the purpose of the change
    
    In HA mode, the Dispatcher should fail if it cannot recover the persisted jobs. The idea
    is that another Dispatcher will be brought up and tries it again. This is better than
    simply dropping the not recovered jobs.
    
    cc @GJL 
    
    ## Brief change log
    
    - Fail the `Dispatcher`/`JobManager` in case that we cannot recover a persisted job
    
    ## Verifying this change
    
    - Added `DispatcherTest#testFatalErrorAfterJobIdRecoveryFailure` and `DispatcherTest#testFatalErrorAfterJobRecoveryFailure`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink failIfJobNotRecoverable

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5746
    
----
commit d15e5a2897e5b17ee256cac1374bbcee24104fe2
Author: Till Rohrmann <tr...@...>
Date:   2018-03-22T09:46:04Z

    [hotfix] Extend TestingFatalErrorHandler to return an error future

commit 50004f3cfcba112d0e7f05b9875931d25d102110
Author: Till Rohrmann <tr...@...>
Date:   2018-03-22T09:46:28Z

    [hotfix] Add BiFunctionWithException

commit f6a6d2da064ff80125600a3a90e773684dc24715
Author: Till Rohrmann <tr...@...>
Date:   2018-03-21T21:36:33Z

    [FLINK-8943] [ha] Fail Dispatcher if jobs cannot be recovered from HA store
    
    In HA mode, the Dispatcher should fail if it cannot recover the persisted jobs. The idea
    is that another Dispatcher will be brought up and tries it again. This is better than
    simply dropping the not recovered jobs.

----


> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
>                 Key: FLINK-8943
>                 URL: https://issues.apache.org/jira/browse/FLINK-8943
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Gary Yao
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip6
>             Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the recovery attempt, the jobs will simply be not running until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
> 	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> 	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
> 	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
> 	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
> 	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
> 	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
> 	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
> 	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
> 	... 7 more
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
> 	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> 	... 40 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)