You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chenxuying <cx...@163.com> on 2021/04/25 12:43:31 UTC
The wrong Options of Kafka Connector, will make the cluster can not
run any job
environment:
flinksql 1.12.2
k8s session mode
description:
I got follow error log when my kafka connector port was wrong
>>>>>
2021-04-25 16:49:50
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition filebeat_json_install_log-3 could be determined
>>>>>
I got follow error log when my kafka connector ip was wrong
>>>>>
2021-04-25 20:12:53
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
>>>>>
When the job was cancelled,there was follow error log:
>>>>>
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state CANCELLING to CANCELED.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 1 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1' not discarded.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 2 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2' not discarded.
2021-04-25 08:53:41,116 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 3 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3' not discarded.
2021-04-25 08:53:41,137 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.
2021-04-25 08:53:41,148 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool.
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 5bdeb8d0f65a90ecdfafd7f102050b19: JobManager is shutting down..
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool.
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_3 for job fcc451b8a521398b10e5b86153141fbf from the resource manager.
2021-04-25 08:53:41,178 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Could not archive completed job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf) to the history server.
java.util.concurrent.CompletionException: java.lang.ExceptionInInitializerError
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_265]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) [?:1.8.0_265]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643) [?:1.8.0_265]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ExceptionInInitializerError
at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) ~[template-common-jar-0.0.1.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_265]
... 3 more
Caused by: java.lang.IllegalStateException: zip file closed
at java.util.zip.ZipFile.ensureOpen(ZipFile.java:686) ~[?:1.8.0_265]
at java.util.zip.ZipFile.getEntry(ZipFile.java:315) ~[?:1.8.0_265]
at java.util.jar.JarFile.getEntry(JarFile.java:240) ~[?:1.8.0_265]
at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128) ~[?:1.8.0_265]
at java.util.jar.JarFile.getJarEntry(JarFile.java:223) ~[?:1.8.0_265]
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1054) ~[?:1.8.0_265]
at sun.misc.URLClassPath.getResource(URLClassPath.java:249) ~[?:1.8.0_265]
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[?:1.8.0_265]
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_265]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_265]
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_265]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_265]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_265]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_265]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_265]
at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_265]
at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:168) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLogger.createConverter(Log4jLogger.java:416) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLogger.<init>(Log4jLogger.java:54) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:39) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:54) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:277) ~[template-common-jar-0.0.1.jar:?]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:288) ~[template-common-jar-0.0.1.jar:?]
at org.apache.flink.runtime.history.FsJobArchivist.<clinit>(FsJobArchivist.java:50) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) ~[template-common-jar-0.0.1.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_265]
... 3 more
>>>>>
And then I will get follow error log when I run a new job, unless I restart the cluster
>>>>>
2021-04-25 08:54:06,711 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2021-04-25 08:54:06,715 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using predefined options: DEFAULT.
2021-04-25 08:54:06,715 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2021-04-25 08:54:06,722 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
>>>>>
Re: Re:Re: The wrong Options of Kafka Connector, will make the
cluster can not run any job
Posted by cxydevelop <cx...@163.com>.
oh, I am wrong again, the last it is in flink_1.12.2 not flink_1.11.2
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re:Re: The wrong Options of Kafka Connector, will make the cluster
can not run any job
Posted by chenxuying <cx...@163.com>.
I had tested flink job in flink_1.11.2 and flink_1.12.2. The error log I post before is in flink_1.11.2 cluster.
Now I run job in flink_1.11.2.
1. The wrong Options of Kafka Connector
Ip is right, port is wrong,
```
CREATE TABLE KafkaTable (
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'filebeat_json_install_log',
'properties.bootstrap.servers' = '192.168.0.77:9093',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
```
2. Job details In flink web UI
Log in Root Exception Tabs, as below:
```
2021-04-27 15:59:11
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition filebeat_json_install_log-3 could be determined
```
3. Logs in Job Manager
Job Manager print logs continuously as below:
```
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition filebeat_json_install_log-3 could be determined
2021-04-27 08:03:16,162 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-04-27 08:03:16,163 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-04-27 08:03:16,163 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (876dbcddcec696d42ed887512dacdf22) switched from state RUNNING to RESTARTING.
2021-04-27 08:03:17,163 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (876dbcddcec696d42ed887512dacdf22) switched from state RESTARTING to RUNNING.
2021-04-27 08:03:17,164 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 876dbcddcec696d42ed887512dacdf22 from Checkpoint 6 @ 1619510548493 for 876dbcddcec696d42ed887512dacdf22 located at oss://tanwan-datahub/test/flinksql/checkpoints/876dbcddcec696d42ed887512dacdf22/chk-6.
2021-04-27 08:03:17,165 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore
2021-04-27 08:03:17,165 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ....time, pk_id, key_id, idfv, media_site_id]) (1/1) (278dd023107c2fd3f2b42383e0c01794) switched from CREATED to SCHEDULED.
2021-04-27 08:03:17,165 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ...) (1/1) (278dd023107c2fd3f2b42383e0c01794) switched from SCHEDULED to DEPLOYING.
2021-04-27 08:03:17,166 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: TableSourceScan(ta....c2fd3f2b42383e0c01794 to 192.168.3.64:6122-55a668 @ 192.168.3.64 (dataPort=34077) with allocation id 091b8c459bd00a2deaea398a41c831ab
2021-04-27 08:03:17,176 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[d...3e0c01794) switched from DEPLOYING to RUNNING.
```
3. Cancel job
When I cancel the job ,Job Manager print logs as below:
```
2021-04-27 08:11:18,190 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 876dbcddcec696d42ed887512dacdf22 reached globally terminal state CANCELED.
2021-04-27 08:11:18,196 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job v2_ods_device_action_log(876dbcddcec696d42ed887512dacdf22).
2021-04-27 08:11:18,197 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool.
2021-04-27 08:11:18,197 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 65303b0e98faaa00ada09ad7271be558: Stopping JobMaster for job v2_ods_device_action_log(876dbcddcec696d42ed887512dacdf22)..
2021-04-27 08:11:18,197 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool.
2021-04-27 08:11:18,197 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_3 for job 876dbcddcec696d42ed887512dacdf22 from the resource manager.
2021-04-27 08:11:18,216 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname could be resolved for the IP address 192.168.3.64, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2021-04-27 08:11:18,271 INFO org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - [Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 6087C726766D47343487BE32
[HostId]: null
2021-04-27 08:11:18,275 INFO org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - [Server]Unable to execute HTTP request: Not Found
[ErrorCode]: NoSuchKey
[RequestId]: 6087C726766D473434ADBE32
[HostId]: null
2021-04-27 08:11:18,280 WARN org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - Unable to parse response error: zip file closed
2021-04-27 08:11:18,281 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Could not archive completed job v2_ods_device_action_log(876dbcddcec696d42ed887512dacdf22) to the history server.
java.util.concurrent.CompletionException: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSException: Failed to parse the response result.
[ErrorCode]: InvalidResponse
[RequestId]: 6087C726766D473434B3BE32
[HostId]: null
[ResponseError]:
zip file closed
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) [?:1.8.0_282]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSException: Failed to parse the response result.
[ErrorCode]: InvalidResponse
[RequestId]: 6087C726766D473434B3BE32
[HostId]: null
[ResponseError]:
zip file closed
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createOSSException(ExceptionFactory.java:110) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createInvalidResponseException(ExceptionFactory.java:92) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.utils.ExceptionFactory.createInvalidResponseException(ExceptionFactory.java:81) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:150) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSBucketOperation.listObjects(OSSBucketOperation.java:411) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.listObjects(OSSClient.java:443) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.listObjects(AliyunOSSFileSystemStore.java:506) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:264) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:112) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:154) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[flink-oss-fs-hadoop-1.12.2.jar:1.12.2]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:168) ~[template-common-jar-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.history.FsJobArchivist.archiveJob(FsJobArchivist.java:73) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:57) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[template-common-jar-1.0-SNAPSHOT.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_282]
... 3 more
```
4 The exception that occurred in the REST handler
when I ran a new job I will got logs as below:
```
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 7 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath\n\t
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)\n\t
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)\n\t
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)\n\t
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\t
... 10 more\n
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath\n\t
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:176)\n\t
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:138)\n\t
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:113)\n\t
at com.cxydevelop.flinkdemo.tablesql.example.Gen2Print.socket2print(Gen2Print.java:18)\n\t
at com.cxydevelop.flinkdemo.tablesql.example.Gen2Print.main(Gen2Print.java:13)\n\t
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\t
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)\n\t
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)\n\t
... 13 more\nCaused by: java.lang.IllegalStateException: zip file closed\n\t
at java.util.zip.ZipFile.ensureOpen(ZipFile.java:686)\n\t
at java.util.zip.ZipFile.getEntry(ZipFile.java:315)\n\t
at java.util.jar.JarFile.getEntry(JarFile.java:240)\n\t
at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128)\n\t
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)\n\t
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1054)\n\t
at sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:1032)\n\t
at sun.misc.URLClassPath$1.next(URLClassPath.java:277)\n\t
at sun.misc.URLClassPath$1.hasMoreElements(URLClassPath.java:287)\n\t
at java.net.URLClassLoader$3$1.run(URLClassLoader.java:604)\n\t
at java.net.URLClassLoader$3$1.run(URLClassLoader.java:602)\n\t
at java.security.AccessController.doPrivileged(Native Method)\n\t
at java.net.URLClassLoader$3.next(URLClassLoader.java:601)\n\t
at java.net.URLClassLoader$3.hasMoreElements(URLClassLoader.java:626)\n\t
at sun.misc.CompoundEnumeration.next(CompoundEnumeration.java:45)\n\t
at sun.misc.CompoundEnumeration.hasMoreElements(CompoundEnumeration.java:54)\n\t
at org.apache.flink.util.ChildFirstClassLoader.getResources(ChildFirstClassLoader.java:110)\n\t
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)\n
at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)\n\t
at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)\n\t
at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)\n\t
at java.util.Iterator.forEachRemaining(Iterator.java:115)\n\t
at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:194)\n\t
at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:164)\n\t
at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:122)\n\t
at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:50)\n\t
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:167)\n\t
... 22 more\n
```
At 2021-04-27 02:03:05, "Robert Metzger" <rm...@apache.org> wrote:
Thanks a lot for your message. This could be a bug in Flink. It seems that the archival of the execution graph is failing because some classes are unloaded.
What I observe from your stack traces is that some classes are loaded from flink-dist_2.11-1.11.2.jar, while other classes are loaded from template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader, and this is causing the exception during the archival of the execution graph. Can you make sure that the core Flink classes are only in your classpath once (in flink-dist), and the template-common-jar-0.0.1 doesn't contain the runtime Flink classes? (for example by setting the Flink dependencies to provided when using the maven-shade-plugin).
For the issue while submitting the job, I can not provide you any further help, because you haven't posted the exception that occurred in the REST handler. Could you post this exception here as well?
Best wishes,
Robert
On Sun, Apr 25, 2021 at 2:44 PM chenxuying <cx...@163.com> wrote:
environment:
flinksql 1.12.2
k8s session mode
description:
I got follow error log when my kafka connector port was wrong
>>>>>
2021-04-25 16:49:50
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition filebeat_json_install_log-3 could be determined
>>>>>
I got follow error log when my kafka connector ip was wrong
>>>>>
2021-04-25 20:12:53
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
>>>>>
When the job was cancelled,there was follow error log:
>>>>>
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state CANCELLING to CANCELED.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 1 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1' not discarded.
2021-04-25 08:53:41,115 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 2 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2' not discarded.
2021-04-25 08:53:41,116 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 3 at 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3' not discarded.
2021-04-25 08:53:41,137 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.
2021-04-25 08:53:41,148 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending SlotPool.
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 5bdeb8d0f65a90ecdfafd7f102050b19: JobManager is shutting down..
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool.
2021-04-25 08:53:41,151 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_3 for job fcc451b8a521398b10e5b86153141fbf from the resource manager.
2021-04-25 08:53:41,178 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Could not archive completed job v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf) to the history server.
java.util.concurrent.CompletionException: java.lang.ExceptionInInitializerError
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_265]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) [?:1.8.0_265]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643) [?:1.8.0_265]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ExceptionInInitializerError
at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) ~[template-common-jar-0.0.1.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_265]
... 3 more
Caused by: java.lang.IllegalStateException: zip file closed
at java.util.zip.ZipFile.ensureOpen(ZipFile.java:686) ~[?:1.8.0_265]
at java.util.zip.ZipFile.getEntry(ZipFile.java:315) ~[?:1.8.0_265]
at java.util.jar.JarFile.getEntry(JarFile.java:240) ~[?:1.8.0_265]
at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128) ~[?:1.8.0_265]
at java.util.jar.JarFile.getJarEntry(JarFile.java:223) ~[?:1.8.0_265]
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1054) ~[?:1.8.0_265]
at sun.misc.URLClassPath.getResource(URLClassPath.java:249) ~[?:1.8.0_265]
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[?:1.8.0_265]
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_265]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_265]
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_265]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_265]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_265]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_265]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_265]
at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_265]
at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:168) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLogger.createConverter(Log4jLogger.java:416) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLogger.<init>(Log4jLogger.java:54) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:39) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:54) ~[log4j-api-2.12.1.jar:2.12.1]
at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:277) ~[template-common-jar-0.0.1.jar:?]
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:288) ~[template-common-jar-0.0.1.jar:?]
at org.apache.flink.runtime.history.FsJobArchivist.<clinit>(FsJobArchivist.java:50) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) ~[template-common-jar-0.0.1.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_265]
... 3 more
>>>>>
And then I will get follow error log when I run a new job, unless I restart the cluster
>>>>>
2021-04-25 08:54:06,711 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2021-04-25 08:54:06,715 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using predefined options: DEFAULT.
2021-04-25 08:54:06,715 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2021-04-25 08:54:06,722 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
>>>>>
At last, I want to know if it is normal that a job occur exception will make other job fail, or make JobManager can't run any job
Re: The wrong Options of Kafka Connector, will make the cluster can
not run any job
Posted by Robert Metzger <rm...@apache.org>.
Thanks a lot for your message. This could be a bug in Flink. It seems that
the archival of the execution graph is failing because some classes are
unloaded.
What I observe from your stack traces is that some classes are loaded from
flink-dist_2.11-1.11.2.jar, while other classes are loaded from
template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader,
and this is causing the exception during the archival of the execution
graph. Can you make sure that the core Flink classes are only in your
classpath once (in flink-dist), and the template-common-jar-0.0.1 doesn't
contain the runtime Flink classes? (for example by setting the Flink
dependencies to provided when using the maven-shade-plugin).
For the issue while submitting the job, I can not provide you any further
help, because you haven't posted the exception that occurred in the REST
handler. Could you post this exception here as well?
Best wishes,
Robert
On Sun, Apr 25, 2021 at 2:44 PM chenxuying <cx...@163.com> wrote:
> environment:
>
> flinksql 1.12.2
>
> k8s session mode
>
> description:
>
> I got follow error log when my kafka connector port was wrong
>
> >>>>>
>
> 2021-04-25 16:49:50
>
> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> expired before the position for partition filebeat_json_install_log-3 could
> be determined
>
> >>>>>
>
>
> I got follow error log when my kafka connector ip was wrong
>
> >>>>>
>
> 2021-04-25 20:12:53
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
> >>>>>
>
>
> When the job was cancelled,there was follow error log:
>
> >>>>>
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
> v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from
> state CANCELLING to CANCELED.
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
> checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] -
> Shutting down
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] -
> Checkpoint with ID 1 at
> 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
> not discarded.
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] -
> Checkpoint with ID 2 at
> 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
> not discarded.
>
> 2021-04-25 08:53:41,116 INFO
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] -
> Checkpoint with ID 3 at
> 'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
> not discarded.
>
> 2021-04-25 08:53:41,137 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.
>
> 2021-04-25 08:53:41,148 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping
> the JobMaster for job
> v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).
>
> 2021-04-25 08:53:41,151 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Suspending SlotPool.
>
> 2021-04-25 08:53:41,151 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
> ResourceManager connection 5bdeb8d0f65a90ecdfafd7f102050b19: JobManager is
> shutting down..
>
> 2021-04-25 08:53:41,151 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
> SlotPool.
>
> 2021-04-25 08:53:41,151 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 00000000000000000000000000000000
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_3 for job
> fcc451b8a521398b10e5b86153141fbf from the resource manager.
>
> 2021-04-25 08:53:41,178 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Could not
> archive completed job
> v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf) to the history
> server.
>
> java.util.concurrent.CompletionException:
> java.lang.ExceptionInInitializerError
>
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_265]
>
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> [?:1.8.0_265]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643)
> [?:1.8.0_265]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_265]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_265]
>
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
>
> Caused by: java.lang.ExceptionInInitializerError
>
> at
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> ~[template-common-jar-0.0.1.jar:?]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> ~[?:1.8.0_265]
>
> ... 3 more
>
> Caused by: java.lang.IllegalStateException: zip file closed
>
> at java.util.zip.ZipFile.ensureOpen(ZipFile.java:686) ~[?:1.8.0_265]
>
> at java.util.zip.ZipFile.getEntry(ZipFile.java:315) ~[?:1.8.0_265]
>
> at java.util.jar.JarFile.getEntry(JarFile.java:240) ~[?:1.8.0_265]
>
> at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128)
> ~[?:1.8.0_265]
>
> at java.util.jar.JarFile.getJarEntry(JarFile.java:223) ~[?:1.8.0_265]
>
> at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1054)
> ~[?:1.8.0_265]
>
> at sun.misc.URLClassPath.getResource(URLClassPath.java:249) ~[?:1.8.0_265]
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[?:1.8.0_265]
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_265]
>
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_265]
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> ~[?:1.8.0_265]
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_265]
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> ~[?:1.8.0_265]
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_265]
>
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_265]
>
> at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_265]
>
> at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:168)
> ~[log4j-api-2.12.1.jar:2.12.1]
>
> at
> org.apache.logging.slf4j.Log4jLogger.createConverter(Log4jLogger.java:416)
> ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
>
> at org.apache.logging.slf4j.Log4jLogger.<init>(Log4jLogger.java:54)
> ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
>
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:39)
> ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
>
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.newLogger(Log4jLoggerFactory.java:30)
> ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
>
> at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:54)
> ~[log4j-api-2.12.1.jar:2.12.1]
>
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30)
> ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
>
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:277)
> ~[template-common-jar-0.0.1.jar:?]
>
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:288)
> ~[template-common-jar-0.0.1.jar:?]
>
> at
> org.apache.flink.runtime.history.FsJobArchivist.<clinit>(FsJobArchivist.java:50)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.lambda$archiveExecutionGraph$0(JsonResponseHistoryServerArchivist.java:55)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> ~[template-common-jar-0.0.1.jar:?]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
> ~[?:1.8.0_265]
>
> ... 3 more
>
> >>>>>
>
>
> And then I will get follow error log when I run a new job, unless I
> restart the cluster
>
> >>>>>
>
> 2021-04-25 08:54:06,711 INFO org.apache.flink.client.ClientUtils
> [] - Starting program (detached: true)
>
> 2021-04-25 08:54:06,715 INFO
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using
> predefined options: DEFAULT.
>
> 2021-04-25 08:54:06,715 INFO
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using
> default options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={}}.
>
> 2021-04-25 08:54:06,722 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception
> occurred in REST handler: Could not execute application.
>
> >>>>>
>
>
>
>
>