You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2022/11/24 03:05:00 UTC

[jira] [Updated] (FLINK-30181) MultiInputCheckpointingTimeBenchmark.checkpointMultiInput benchmark failed

     [ https://issues.apache.org/jira/browse/FLINK-30181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Leonard Xu updated FLINK-30181:
-------------------------------
    Component/s: Runtime / Checkpointing

> MultiInputCheckpointingTimeBenchmark.checkpointMultiInput benchmark failed
> --------------------------------------------------------------------------
>
>                 Key: FLINK-30181
>                 URL: https://issues.apache.org/jira/browse/FLINK-30181
>             Project: Flink
>          Issue Type: Bug
>          Components: Benchmarks, Runtime / Checkpointing
>    Affects Versions: 1.17.0
>            Reporter: Yanfei Lei
>            Priority: Major
>
> {code:java}
> 08:30:14  # JMH version: 1.19
> 08:30:14  # VM version: JDK 11.0.16, VM 11.0.16+8-adhoc.root.jdk11u
> 08:30:14  # VM invoker: /usr/lib/jvm/openlogic-openjdk-11.0.16+8-linux-x64/bin/java
> 08:30:14  # VM options: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
> 08:30:14  # Warmup: 10 iterations, 1 s each
> 08:30:14  # Measurement: 10 iterations, 1 s each
> 08:30:14  # Timeout: 10 min per iteration
> 08:30:14  # Threads: 1 thread, will synchronize iterations
> 08:30:14  # Benchmark mode: Throughput, ops/time
> 08:30:14  # Benchmark: org.apache.flink.benchmark.MultiInputCheckpointingTimeBenchmark.checkpointMultiInput
> 08:30:14  
> 08:30:14  # Run progress: 41.07% complete, ETA 01:16:40
> 08:30:14  # Fork: 1 of 3
> 08:30:32  # Warmup Iteration   1: 4.561 ops/s
> 08:30:32  # Warmup Iteration   2: 3.926 ops/s
> 08:30:33  # Warmup Iteration   3: 2.986 ops/s
> 08:30:34  # Warmup Iteration   4: 2.858 ops/s
> 08:30:35  # Warmup Iteration   5: 2.806 ops/s
> 08:30:36  # Warmup Iteration   6: 2.708 ops/s
> 08:30:37  # Warmup Iteration   7: 2.638 ops/s
> 08:30:39  # Warmup Iteration   8: 2.686 ops/s
> 08:30:40  # Warmup Iteration   9: 2.606 ops/s
> 08:30:41  # Warmup Iteration  10: 2.729 ops/s
> 08:30:42  Iteration   1: 2.665 ops/s
> 08:30:43  Iteration   2: 2.686 ops/s
> 08:30:44  Iteration   3: 2.708 ops/s
> 08:30:46  Iteration   4: 2.733 ops/s
> 08:30:47  Iteration   5: 2.759 ops/s
> 08:30:47  Iteration   6: 2.805 ops/s
> 08:30:48  Iteration   7: 2.791 ops/s
> 08:30:50  Iteration   8: 2.762 ops/s
> 08:30:51  Iteration   9: 2.666 ops/s
> 08:30:52  Iteration  10: 2.667 ops/s
> 08:30:52  
> 08:30:52  # Run progress: 41.67% complete, ETA 01:15:43
> 08:30:52  # Fork: 2 of 3
> 08:31:11  # Warmup Iteration   1: 4.756 ops/s
> 08:31:11  # Warmup Iteration   2: 4.163 ops/s
> 08:31:12  # Warmup Iteration   3: 2.977 ops/s
> 08:31:13  # Warmup Iteration   4: 2.628 ops/s
> 08:31:14  # Warmup Iteration   5: 2.614 ops/s
> 08:31:15  # Warmup Iteration   6: 2.623 ops/s
> 08:31:16  # Warmup Iteration   7: 2.625 ops/s
> 08:31:17  # Warmup Iteration   8: 2.538 ops/s
> 08:31:19  # Warmup Iteration   9: 2.754 ops/s
> 08:31:20  # Warmup Iteration  10: 2.701 ops/s
> 08:31:21  Iteration   1: 2.748 ops/s
> 08:31:22  Iteration   2: 2.691 ops/s
> 08:31:23  Iteration   3: 2.654 ops/s
> 08:31:24  Iteration   4: 2.683 ops/s
> 08:31:26  Iteration   5: 2.527 ops/s
> 08:31:27  Iteration   6: 2.729 ops/s
> 08:31:28  Iteration   7: 2.683 ops/s
> 08:31:29  Iteration   8: 2.642 ops/s
> 08:31:30  Iteration   9: 2.653 ops/s
> 08:36:37  01:36:31,596 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka://flink/user/rpc/taskmanager_9.
> 08:36:37  org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration PT5M. This indicates a problem with this instance. Terminating now.
> 08:36:37  	at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1524) ~[flink-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$17(TaskExecutor.java:1509) ~[flink-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_8f0f0fd6-15b6-408d-9f86-28585d436b16.jar:1.17-SNAPSHOT]
> 08:36:37  	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
> 08:36:37  	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) [?:?]
> 08:36:37  	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
> 08:36:37  	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
> 08:36:37  	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) [?:?]
> 08:41:44  Iteration  10: (*interrupt*) <failure>
> 08:41:44  
> 08:41:44  java.lang.InterruptedException
> 08:41:44  	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
> 08:41:44  	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> 08:41:44  	at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:36)
> 08:41:44  	at org.apache.flink.benchmark.FlinkEnvironmentContext.tearDown(FlinkEnvironmentContext.java:87)
> 08:41:44  	at org.apache.flink.benchmark.generated.MultiInputCheckpointingTimeBenchmark_checkpointMultiInput_jmhTest.checkpointMultiInput_Throughput(MultiInputCheckpointingTimeBenchmark_checkpointMultiInput_jmhTest.java:99)
> 08:41:44  	at jdk.internal.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
> 08:41:44  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44  	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44  	at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
> 08:41:44  	at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
> 08:41:44  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 08:41:44  	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 08:41:44  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 08:41:44  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 08:41:44  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:44  	at java.base/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  
> 08:41:44  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 24 seconds more...>
> 08:41:44  
> 08:41:44  Non-finished threads:
> 08:41:44  
> 08:41:44  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[flink-metrics-scheduler-1,5,main]
> 08:41:44    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[DestroyJavaVM,5,main]
> 08:41:44  
> 08:41:44  Thread[flink-scheduler-1,5,main]
> 08:41:44    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[pool-32-thread-1,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  
> 08:41:44  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 19 seconds more...>
> 08:41:44  
> 08:41:44  Non-finished threads:
> 08:41:44  
> 08:41:44  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[flink-metrics-scheduler-1,5,main]
> 08:41:44    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:44    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:44    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:44    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:44    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:44    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:44    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:44    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:44    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:44  
> 08:41:44  Thread[DestroyJavaVM,5,main]
> 08:41:44  
> 08:41:44  Thread[flink-scheduler-1,5,main]
> 08:41:44    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:44    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  Thread[pool-32-thread-1,5,main]
> 08:41:44    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:41:44    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:44    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:44  
> 08:41:44  
> 08:41:45  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 14 seconds more...>
> 08:41:45  
> 08:41:45  Non-finished threads:
> 08:41:45  
> 08:41:45  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:41:45    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:45    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:45    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:45    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:45    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:45    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:45    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:45    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:45    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:45    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:45    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:45  
> 08:41:45  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:41:45    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:45  
> 08:41:45  Thread[flink-metrics-scheduler-1,5,main]
> 08:41:45    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:45    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:45  
> 08:41:45  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:41:45    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:45    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:45    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:45    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:45    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:45    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:45    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:45    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:45    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:45    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:45    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:45    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:45  
> 08:41:45  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:41:45    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:45  
> 08:41:45  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:41:45    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:45  
> 08:41:45  Thread[DestroyJavaVM,5,main]
> 08:41:45  
> 08:41:45  Thread[flink-scheduler-1,5,main]
> 08:41:45    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:45    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:45    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:45  
> 08:41:45  Thread[pool-32-thread-1,5,main]
> 08:41:45    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:41:45    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:45    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:45  
> 08:41:45  
> 08:41:50  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 9 seconds more...>
> 08:41:50  
> 08:41:50  Non-finished threads:
> 08:41:50  
> 08:41:50  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:41:50    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:50    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:50    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:50    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:50    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:50    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:50    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:50    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:50    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:50    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:50    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:50  
> 08:41:50  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:41:50    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:50  
> 08:41:50  Thread[flink-metrics-scheduler-1,5,main]
> 08:41:50    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:50    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:50  
> 08:41:50  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:41:50    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:50    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:50    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:50    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:50    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:50    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:50    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:50    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:50    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:50    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:50    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:50    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:50  
> 08:41:50  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:41:50    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:50  
> 08:41:50  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:41:50    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:50  
> 08:41:50  Thread[DestroyJavaVM,5,main]
> 08:41:50  
> 08:41:50  Thread[flink-scheduler-1,5,main]
> 08:41:50    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:50    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:50    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:50  
> 08:41:50  Thread[pool-32-thread-1,5,main]
> 08:41:50    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:41:50    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:50    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:50  
> 08:41:50  
> 08:41:56  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 4 seconds more...>
> 08:41:56  
> 08:41:56  Non-finished threads:
> 08:41:56  
> 08:41:56  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:41:56    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:56    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:56    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:56    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:56    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:56    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:56    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:56    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:56    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:56    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:56    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:56  
> 08:41:56  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:41:56    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:56  
> 08:41:56  Thread[flink-metrics-scheduler-1,5,main]
> 08:41:56    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:56    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:56  
> 08:41:56  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:41:56    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:56    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:56    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:41:56    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:41:56    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:41:56    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:41:56    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:41:56    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:41:56    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:41:56    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:41:56    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:41:56    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:56  
> 08:41:56  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:41:56    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:56  
> 08:41:56  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:41:56    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:41:56  
> 08:41:56  Thread[DestroyJavaVM,5,main]
> 08:41:56  
> 08:41:56  Thread[flink-scheduler-1,5,main]
> 08:41:56    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:41:56    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:41:56    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:56  
> 08:41:56  Thread[pool-32-thread-1,5,main]
> 08:41:56    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:41:56    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:41:56    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:41:56  
> 08:41:56  
> 08:42:00  <JMH had finished, but forked VM did not exit, are there stray running threads? Waiting 0 seconds more...>
> 08:42:00  
> 08:42:00  Non-finished threads:
> 08:42:00  
> 08:42:00  Thread[co-map -> Sink: Unnamed (3/4)#0,5,Flink Task Threads]
> 08:42:00    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:42:00    at java.base@11.0.16/java.util.ArrayList.writeObject(ArrayList.java:897)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:42:00    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:42:00    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:42:00    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:42:00  
> 08:42:00  Thread[flink-akka.actor.default-dispatcher-10,5,main]
> 08:42:00    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:42:00  
> 08:42:00  Thread[flink-metrics-scheduler-1,5,main]
> 08:42:00    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:42:00    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:42:00  
> 08:42:00  Thread[co-map -> Sink: Unnamed (2/4)#0,5,Flink Task Threads]
> 08:42:00    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:990)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
> 08:42:00    at java.base@11.0.16/java.lang.Throwable.writeObject(Throwable.java:996)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> 08:42:00    at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00    at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00    at java.base@11.0.16/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
> 08:42:00    at java.base@11.0.16/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
> 08:42:00    at app//org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:548)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
> 08:42:00    at app//org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1039)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:837)
> 08:42:00    at app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> 08:42:00    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:42:00  
> 08:42:00  Thread[flink-metrics-akka.actor.internal-dispatcher-2,5,main]
> 08:42:00    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:42:00  
> 08:42:00  Thread[flink-akka.actor.internal-dispatcher-4,5,main]
> 08:42:00    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1628)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> 08:42:00  
> 08:42:00  Thread[DestroyJavaVM,5,main]
> 08:42:00  
> 08:42:00  Thread[flink-scheduler-1,5,main]
> 08:42:00    at java.base@11.0.16/java.lang.Thread.sleep(Native Method)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:90)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:300)
> 08:42:00    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270)
> 08:42:00    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:42:00  
> 08:42:00  Thread[pool-32-thread-1,5,main]
> 08:42:00    at java.base@11.0.16/jdk.internal.misc.Unsafe.park(Native Method)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
> 08:42:00    at java.base@11.0.16/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:42:00    at java.base@11.0.16/java.lang.Thread.run(Thread.java:829)
> 08:42:00  
> 08:42:00  
> 08:42:00  <shutdown timeout of 30 seconds expired, forcing forked VM to exit>
> 08:42:00  Benchmark had encountered error, and fail on error was requested
> 08:42:00  ERROR: org.openjdk.jmh.runner.RunnerException: Benchmark caught the exception
> 08:42:00  	at org.openjdk.jmh.runner.Runner.runBenchmarks(Runner.java:570)
> 08:42:00  	at org.openjdk.jmh.runner.Runner.internalRun(Runner.java:313)
> 08:42:00  	at org.openjdk.jmh.runner.Runner.run(Runner.java:206)
> 08:42:00  	at org.openjdk.jmh.Main.main(Main.java:71)
> 08:42:00  Caused by: org.openjdk.jmh.runner.BenchmarkException: Benchmark error during the run
> 08:42:00  	at org.openjdk.jmh.runner.BenchmarkHandler.runIteration(BenchmarkHandler.java:428)
> 08:42:00  	at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:282)
> 08:42:00  	at org.openjdk.jmh.runner.BaseRunner.runBenchmark(BaseRunner.java:234)
> 08:42:00  	at org.openjdk.jmh.runner.BaseRunner.doSingle(BaseRunner.java:139)
> 08:42:00  	at org.openjdk.jmh.runner.BaseRunner.runBenchmarksForked(BaseRunner.java:76)
> 08:42:00  	at org.openjdk.jmh.runner.ForkedRunner.run(ForkedRunner.java:72)
> 08:42:00  	at org.openjdk.jmh.runner.ForkedMain.main(ForkedMain.java:84)
> 08:42:00  	Suppressed: java.lang.InterruptedException
> 08:42:00  		at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
> 08:42:00  		at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> 08:42:00  		at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:36)
> 08:42:00  		at org.apache.flink.benchmark.FlinkEnvironmentContext.tearDown(FlinkEnvironmentContext.java:87)
> 08:42:00  		at org.apache.flink.benchmark.generated.MultiInputCheckpointingTimeBenchmark_checkpointMultiInput_jmhTest.checkpointMultiInput_Throughput(MultiInputCheckpointingTimeBenchmark_checkpointMultiInput_jmhTest.java:99)
> 08:42:00  		at jdk.internal.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
> 08:42:00  		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00  		at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00  		at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
> 08:42:00  		at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
> 08:42:00  		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 08:42:00  		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 08:42:00  		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 08:42:00  		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 08:42:00  		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 08:42:00  		at java.base/java.lang.Thread.run(Thread.java:829)
> 08:42:00  [ERROR] Command execution failed.
> 08:42:00  org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1)
> 08:42:00  	at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404)
> 08:42:00  	at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166)
> 08:42:00  	at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:804)
> 08:42:00  	at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:751)
> 08:42:00  	at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:313)
> 08:42:00  	at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
> 08:42:00  	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 08:42:00  	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:154)
> 08:42:00  	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:146)
> 08:42:00  	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:117)
> 08:42:00  	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:81)
> 08:42:00  	at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> 08:42:00  	at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> 08:42:00  	at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:309)
> 08:42:00  	at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:194)
> 08:42:00  	at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:107)
> 08:42:00  	at org.apache.maven.cli.MavenCli.execute(MavenCli.java:993)
> 08:42:00  	at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:345)
> 08:42:00  	at org.apache.maven.cli.MavenCli.main(MavenCli.java:191)
> 08:42:00  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 08:42:00  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 08:42:00  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 08:42:00  	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 08:42:00  	at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> 08:42:00  	at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> 08:42:00  	at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> 08:42:00  	at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> 08:42:00  [INFO] ------------------------------------------------------------------------
> 08:42:00  [INFO] BUILD FAILURE
> 08:42:00  [INFO] ------------------------------------------------------------------------
> 08:42:00  [INFO] Total time: 01:05 h
> 08:42:00  [INFO] Finished at: 2022-11-23T01:42:00+01:00
> 08:42:01  [INFO] Final Memory: 106M/416M
> 08:42:01  [INFO] ------------------------------------------------------------------------
> 08:42:01  [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:exec (default-cli) on project benchmark: Command execution failed.: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
> 08:42:01  [ERROR] 
> 08:42:01  [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> 08:42:01  [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> 08:42:01  [ERROR] 
> 08:42:01  [ERROR] For more information about the errors and possible solutions, please read the following articles:
> 08:42:01  [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
> [Pipeline] }
>  {code}
> [http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-java8/889/consoleFull]
> http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-java11/300/consoleFull



--
This message was sent by Atlassian Jira
(v8.20.10#820010)