You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2021/06/16 11:30:57 UTC

Re: flink1.11.2集群出现了3种连接拒绝,导致任务失败

mark. 我也是第一个问题,暂时无解。

chaiyi <ch...@163.com> 于2021年3月22日周一 下午12:28写道:
>
> 你好:
> 最近建立一个3台机子的flink集群,版本是 zk-3.6.2 + hadoop-3.3.0 + flink-1.11.2。3台机制是在同一个物理机上建立的虚拟机,应该来说不会出现网络波动导致的网络拒绝,但是为什么一直会出现网络拒绝
> 项目在运行一段时间以后,短则几个小时,长则3到5天,任务就会挂掉,一共出现了一下3种异常,全是网络连接方法的,请帮忙看看,是不是flink网络配置方面有问题。
> 1. 集群之间通信连接拒绝:
> 2021-03-03 08:50:42,851 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(ProcessingTimeSessionWindows(90000), ProcessingTimeTrigger, FlightTrackAggregate, FlightTrackSectorResult) -> Sink: Unnamed (4/4) (3097c00c09b475b35c23782a3b4a8eaa) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4df09503.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.100.1.222/10.100.1.222:43156')
> at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1388) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:730) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:820) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:424) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:326) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_152]
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
>
>
> 2. 连接到ZK的请求失败,
> 2021-03-02 20:27:13,487 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Unable to read additional data from server sessionid 0x30018710a580007, likely server has closed socket, closing socket connection and attempting reconnect
> 2021-03-02 20:27:13,588 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: SUSPENDED
> 2021-03-02 20:27:13,590 WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper suspended. The contender LeaderContender: DefaultDispatcherRunner no longer participates in the leader election.
> 2021-03-02 20:27:13,591 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
> 2021-03-02 20:27:13,591 WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper suspended. The contender LeaderContender: StandaloneResourceManager no longer participates in the leader election.
> 2021-03-02 20:27:13,591 WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper suspended. The contender http://flink-02:8081 no longer participates in the leader election.
> 2021-03-02 20:27:13,591 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
> 2021-03-02 20:27:13,830 WARN  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-7735477848930862421.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
> 2021-03-02 20:27:13,830 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Opening socket connection to server flink-02/10.100.1.222:2181
> 2021-03-02 20:27:13,831 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
> 2021-03-02 20:27:13,831 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Socket connection established to flink-02/10.100.1.222:2181, initiating session
> 2021-03-02 20:27:13,835 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server flink-02/10.100.1.222:2181, sessionid = 0x30018710a580007, negotiated timeout = 40000
> 2021-03-02 20:27:13,835 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: RECONNECTED
> 2021-03-02 20:27:13,841 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper was reconnected. Leader election can be restarted.
> 2021-03-02 20:27:13,842 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
> 2021-03-02 20:27:13,842 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper was reconnected. Leader election can be restarted.
> 2021-03-02 20:27:13,843 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - Connection to ZooKeeper was reconnected. Leader election can be restarted.
> 2021-03-02 20:27:13,843 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>
>
>
>
> 3.连接到RabbitMQ的连接超时
> 2021-02-07 18:12:39,761 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler           [] - An unexpected connection driver error occured
> com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 60 seconds
> at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:732) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:651) [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> 2021-02-07 18:12:40,764 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during disposal of stream operator.
> com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 60 seconds
> at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:875) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:991) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:920) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:904) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:896) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.close(AutorecoveringConnection.java:216) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:182) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> 2021-02-07 18:12:40,766 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source (1/1) (912e629ede54920b0689fcf0b2398a7b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Messages could not be acknowledged during checkpoint creation.
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:236) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.acknowledgeIDs(MultipleIdsMessageAcknowledgingSourceBase.java:122) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.notifyCheckpointComplete(MessageAcknowledgingSourceBase.java:239) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:107) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:283) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:987) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:958) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:974) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> Caused by: java.io.IOException
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1343) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:503) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:234) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> ... 18 more
> Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
> at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1343) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:503) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:234) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> ... 18 more
> Caused by: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 60 seconds
> at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:732) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:651) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
> at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?]
>
>
> Flink 配置文件如下:
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 2048m
> taskmanager.memory.process.size: 12288m
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.fraction: 0
> taskmanager.numberOfTaskSlots: 3
> parallelism.default: 3
> task.cancellation.timeout: 0
> high-availability: zookeeper
> high-availability.cluster-id: /track_ns
> high-availability.zookeeper.path.root: /flink_track
> high-availability.storageDir: hdfs://yfcluster/flink/ha/
> recovery.zookeeper.storageDir: hdfs://yfcluster/zookeeper/ha/
> yarn.application-attempts: 10
> high-availability.zookeeper.quorum: flink-01:2181,flink-02:2181,flink-03:2181
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://yfcluster/flink/checkpoints
> state.savepoints.dir: hdfs://yfcluster/flink/savepoints
> state.backend.incremental: false
> jobmanager.execution.failover-strategy: region
> state.checkpoints.num-retained: 3
> state.backend.local-recovery: true
> taskmanager.state.local.root-dirs: /opt/flink-tm-state
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>