You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Chief <co...@foxmail.com> on 2020/04/13 14:49:04 UTC

flink 1.7.2 YARN Session模式提交任务问题求助

大家好
目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf


2020-04-10 19:12:02,908 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1:23584/user/resourcemanager(00000000000000000000000000000000)
2020-04-10 19:12:02,909 INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
2020-04-10 19:12:02,911 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Resolved ResourceManager address, beginning registration
2020-04-10 19:12:02,911 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
2020-04-10 19:12:02,912 INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
2020-04-10 19:12:02,913 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Registering job manager 00000000000000000000000000000000@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,917 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Registered job manager 00000000000000000000000000000000@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,919 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2020-04-10 19:12:02,919 INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Requesting new slot [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
2020-04-10 19:12:02,920 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
2020-04-10 19:12:02,921 INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Requesting new slot [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
2020-04-10 19:12:02,924 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Requesting new TaskExecutor container with resources <memory:4096, vCores:6&gt;. Number pending requests 1.
2020-04-10 19:12:02,926 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
2020-04-10 19:12:06,531 INFO&nbsp; org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Received new token for : trusfortpoc3:35206
2020-04-10 19:12:06,543 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Received new container: container_1586426824930_0006_01_000002 - Remaining pending container requests: 1
2020-04-10 19:12:06,543 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Removing container request Capability[<memory:4096, vCores:6&gt;]Priority[1]. Pending container requests 0.
2020-04-10 19:12:06,568 ERROR org.apache.flink.yarn.YarnResourceManager&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;- Could not start TaskManager in container container_1586426824930_0006_01_000002.
java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfsClusterForML
	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
	at org.apache.hadoop.hdfs.DFSClient.<init&gt;(DFSClient.java:687)
	at org.apache.hadoop.hdfs.DFSClient.<init&gt;(DFSClient.java:628)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2701)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2683)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:372)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:453)
	at org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:555)
	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:390)
	at org.apache.flink.yarn.YarnResourceManager$$Lambda$183/1182651376.run(Unknown Source)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.UnknownHostException: hdfsClusterForML
	... 33 more


这个hdfsClusterForML是namenode ha 的nameservice,经过分析是没加载hdfs-site.xml配置导致的,
也尝试过把Hadoop的几个配置文件放到flink 的conf目录下但都无效,最终通过改YarnResourceManager源码后能够正常提交任务。
public YarnResourceManager(
&nbsp; &nbsp; &nbsp; RpcService rpcService,
&nbsp; &nbsp; &nbsp; String resourceManagerEndpointId,
&nbsp; &nbsp; &nbsp; ResourceID resourceId,
&nbsp; &nbsp; &nbsp; Configuration flinkConfig,
&nbsp; &nbsp; &nbsp; Map<String, String&gt; env,
&nbsp; &nbsp; &nbsp; HighAvailabilityServices highAvailabilityServices,
&nbsp; &nbsp; &nbsp; HeartbeatServices heartbeatServices,
&nbsp; &nbsp; &nbsp; SlotManager slotManager,
&nbsp; &nbsp; &nbsp; MetricRegistry metricRegistry,
&nbsp; &nbsp; &nbsp; JobLeaderIdService jobLeaderIdService,
&nbsp; &nbsp; &nbsp; ClusterInformation clusterInformation,
&nbsp; &nbsp; &nbsp; FatalErrorHandler fatalErrorHandler,
&nbsp; &nbsp; &nbsp; @Nullable String webInterfaceUrl,
&nbsp; &nbsp; &nbsp; JobManagerMetricGroup jobManagerMetricGroup) {
&nbsp; &nbsp;super(
&nbsp; &nbsp; &nbsp; rpcService,
&nbsp; &nbsp; &nbsp; resourceManagerEndpointId,
&nbsp; &nbsp; &nbsp; resourceId,
&nbsp; &nbsp; &nbsp; highAvailabilityServices,
&nbsp; &nbsp; &nbsp; heartbeatServices,
&nbsp; &nbsp; &nbsp; slotManager,
&nbsp; &nbsp; &nbsp; metricRegistry,
&nbsp; &nbsp; &nbsp; jobLeaderIdService,
&nbsp; &nbsp; &nbsp; clusterInformation,
&nbsp; &nbsp; &nbsp; fatalErrorHandler,
&nbsp; &nbsp; &nbsp; jobManagerMetricGroup);
&nbsp; &nbsp;this.flinkConfig&nbsp; = flinkConfig;
&nbsp; &nbsp;this.yarnConfig = new YarnConfiguration(HadoopUtils.getHadoopConfiguration(flinkConfig));
但我认为这肯定不是解决这个问题的方法,所以向大家求助,是不是我忽略什么。