You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jester_jim <je...@sina.com> on 2020/09/29 06:19:16 UTC

Flink配置hdfs状态后端遇到的问题

Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2 jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO  org.apache.hadoop.security.UserGroupInformation              [] - Login successful for user jester/principle using keytab file /kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO  org.apache.hadoop.security.UserGroupInformation              [] - Login successful for user  jester/principle  using keytab file /kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab


------------------------------------------------------------
 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'OracleSource'.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'OracleSource'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
at OracleSource.OracleSourceMain$.main(OracleSourceMain.scala:52)
at OracleSource.OracleSourceMain.main(OracleSourceMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:305)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:224)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
... 7 more
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=jester, access=WRITE, inode="/":hadoop:hadoop:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:279)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:260)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:240)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:162)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3529)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3512)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:3494)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6615)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4385)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4355)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4328)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:873)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:323)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:618)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)


at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2744)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1819)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.initializeBaseLocations(FsCheckpointStorage.java:111)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:303)
... 22 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=jester, access=WRITE, inode="/":hadoop:hadoop:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:279)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:260)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:240)
at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:162)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3529)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3512)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:3494)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6615)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4385)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4355)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4328)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:873)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:323)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:618)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2220)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2216)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2214)


at org.apache.hadoop.ipc.Client.call(Client.java:1470)
at org.apache.hadoop.ipc.Client.call(Client.java:1401)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy48.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy49.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742)
... 32 more


End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more

Re: Flink配置hdfs状态后端遇到的问题

Posted by Robin Zhang <vi...@outlook.com>.
Hi jester_jim,
    
配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
id.所以,并不是新建父目录就可以,依然会存在权限问题
。

  祝好,Robin Zhang




Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
state.checkpoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO 
org.apache.hadoop.security.UserGroupInformation              [] - Login
successful for user jester/principle using keytab file
/kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO 
org.apache.hadoop.security.UserGroupInformation              [] - Login
successful for user  jester/principle  using keytab file
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab




--
Sent from: http://apache-flink.147419.n8.nabble.com/