You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xuekui <ba...@foxmail.com> on 2022/01/24 11:43:24 UTC

Flink job of multiple sink tables can't started on yarn

Hi all,


I have one flink job which reads data from one kafka topic and sinks to two kafka topics using Flink SQL.


The code is something like this:


tableEnv.executeSql(
"""
create table sink_table1 (
xxx
xxx
) with (
&nbsp; &nbsp; 'connector' = 'kafka',
&nbsp; &nbsp; 'topic' = 'topic1'
)
"""
)


tableEnv.executeSql(
"""
create table sink_table2 (
xxx
xxx
) with (
&nbsp; &nbsp; 'connector' = 'kafka',
&nbsp; &nbsp; &nbsp;'topic' = 'topic2'
)
"""
)



The code works well in local minicluster mode.&nbsp;
But when I deploy it on yarn, one application is running well and the other would fail with the following error


022-01-24 11:16:25,374 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint&nbsp; &nbsp; &nbsp; &nbsp; [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569) [flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.12.2.jar:1.12.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
	at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_161]
	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 42888
	at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_161]
	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
	... 2 more



It seems that the two sink tables would trigger two yarn applications and each application will start one job manager and two job managers are using the the same rest port on same container, so there's only one application can run successfully, the other would fail.


Is there anything I can do to start both applications?


Thanks
Xuekui