You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Vinnik <al...@gmail.com> on 2018/07/24 02:56:26 UTC
Flink 1.5 batch job fails to start
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
and getting a weird exception.
Job reads json from s3a and writes parquet files to s3a with avro model.
Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
S3AFileSystem class.
Fails here
https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
with
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data
To be exact it fails right on that line.
https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
Not sure how to resolve this problem. Looking for an advice. Let me know if
more info is needed. Full stack is below. Thanks.
org.apache.flink.runtime.rest.handler.RestHandlerException:
org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSink
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more
Re: Flink 1.5 batch job fails to start
Posted by Alex Vinnik <al...@gmail.com>.
Hi Vino,
Data is ok i double checked. Input is plain json and it can be processed by
same code compiled and run on 1.3.1 flink. Thanks for the hint about avro
and parquet versions. Got my fat jar synced up with flink 1.5.1
avro/parguet versions. Hope was high that it will help to resolve the
problem. And one run of the job actually was successful., but it started
failing after that with the same problem. Weird. Will continue to poke
around, feels I am so close :)
Best,
-Alex
On Tue, Jul 24, 2018 at 9:08 PM vino yang <ya...@gmail.com> wrote:
> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>
>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Alex,
>>>
>>> Based on your log information, the potential reason is Hadoop version.
>>> To troubleshoot the exception comes from different Hadoop version. I
>>> suggest you match the both side of Hadoop version.
>>>
>>> You can :
>>>
>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>> official website provides the binary binding Hadoop 2.8.[1]
>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>> Flink Cluster's hadoop dependency's version.
>>>
>>> [1]:
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>> version 2.8 on flink server side? BTW job runs fine locally reading from
>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>
>>>> Regarding java version. The job is submitted via Flink UI, so it should
>>>> not be a problem.
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>> 2018-07-24T12:09:38.083+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
>>>> user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
>>>> Hadoop/Kerberos user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
>>>> size: 1963 MiBytes
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
>>>> /docker-java-home/jre
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>>>> version: 2.7.3
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
>>>> Arguments:
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --configDir
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> /opt/flink/conf
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --executionMode
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.854+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>> StandaloneSessionClusterEntrypoint.
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>> default filesystem.
>>>> 2018-07-24T12:09:38.927+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>> security context.
>>>> 2018-07-24T12:09:39.034+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
>>>> cluster services.
>>>> 2018-07-24T12:09:39.059+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
>>>> start actor system at flink-jobmanager:6123
>>>> 2018-07-24T12:09:40.335+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>
>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> I'm not entirely sure what causes this problem because it is the first
>>>>> time I see it.
>>>>>
>>>>> First question would be if the problem also arises if using a
>>>>> different Hadoop version.
>>>>>
>>>>> Are you using the same Java versions on the client as well as on the
>>>>> server?
>>>>>
>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>>> 1.5 and getting a weird exception.
>>>>>>
>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>> access to S3AFileSystem class.
>>>>>>
>>>>>> Fails here
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>> with
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>>
>>>>>> To be exact it fails right on that line.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>
>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>
>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>> at
>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>> at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>> ... 29 more
>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>> at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>> ... 4 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Could not set up JobManager
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>> ... 21 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Cannot initialize task 'DataSink
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>> Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>> ... 26 more
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>> ... 31 more
>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>> at
>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>> ... 32 more
>>>>>>
>>>>>>
>>>
>
Re: Flink 1.5 batch job fails to start
Posted by Alex Vinnik <al...@gmail.com>.
Hi Till,
Server start up entrypoint log
2018-07-25T12:19:12.268+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-25T12:19:12.271+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:3488f8b,
Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25T12:19:12.271+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
user: flink
2018-07-25T12:19:18.599+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
Hadoop/Kerberos user: flink
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
size: 1963 MiBytes
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
/docker-java-home/jre
2018-07-25T12:19:18.615+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
version: 2.8.1
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
Arguments:
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-25T12:19:18.620+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-25T12:19:18.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
StandaloneSessionClusterEntrypoint.
2018-07-25T12:19:18.854+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
default filesystem.
2018-07-25T12:19:19.045+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
security context.
2018-07-25T12:19:19.520+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
cluster services.
2018-07-25T12:19:19.601+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
start actor system at flink-jobmanager:6123
2018-07-25T12:19:24.768+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
started at akka.tcp://flink@flink-jobmanager:6123
Below flink client log file
docker exec -it flink-jobmanager flink run /fat.jar --input-path
s3a://json-input --output-path s3a://parquet-output
2018-07-25 13:40:30,752 INFO org.apache.flink.client.cli.CliFrontend
-
--------------------------------------------------------------------------------
2018-07-25 13:40:30,756 INFO org.apache.flink.client.cli.CliFrontend
- Starting Command Line Client (Version: <unknown>,
Rev:3488f8b, Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25 13:40:30,756 INFO org.apache.flink.client.cli.CliFrontend
- OS current user: root
2018-07-25 13:40:31,797 INFO org.apache.flink.client.cli.CliFrontend
- Current Hadoop/Kerberos user: root
2018-07-25 13:40:31,797 INFO org.apache.flink.client.cli.CliFrontend
- JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
1.8/25.171-b11
2018-07-25 13:40:31,797 INFO org.apache.flink.client.cli.CliFrontend
- Maximum heap size: 2667 MiBytes
2018-07-25 13:40:31,797 INFO org.apache.flink.client.cli.CliFrontend
- JAVA_HOME: /docker-java-home/jre
2018-07-25 13:40:31,800 INFO org.apache.flink.client.cli.CliFrontend
- Hadoop version: 2.8.1
2018-07-25 13:40:31,800 INFO org.apache.flink.client.cli.CliFrontend
- JVM Options:
2018-07-25 13:40:31,800 INFO org.apache.flink.client.cli.CliFrontend
-
-Dlog.file=/opt/flink/log/flink--client-d9831b2552d5.log
2018-07-25 13:40:31,800 INFO org.apache.flink.client.cli.CliFrontend
-
-Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
2018-07-25 13:40:31,800 INFO org.apache.flink.client.cli.CliFrontend
-
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- Program Arguments:
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- run
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- /peer-group-transform-all.jar
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- --input-path
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- s3a://odin-tmp/explore-data/20180711104638/
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- --output-path
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- s3a://odin-tmp/transformed/20180712/IDA-1917-1
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
- Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25 13:40:31,801 INFO org.apache.flink.client.cli.CliFrontend
-
--------------------------------------------------------------------------------
2018-07-25 13:40:31,806 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:31,809 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:31,809 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,151 INFO
org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user
set to root (auth:SIMPLE)
2018-07-25 13:40:32,190 INFO org.apache.flink.client.cli.CliFrontend
- Running 'run' command.
2018-07-25 13:40:32,197 INFO org.apache.flink.client.cli.CliFrontend
- Building program from JAR file
2018-07-25 13:40:32,363 WARN org.apache.flink.configuration.Configuration
- Config uses deprecated configuration key
'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-07-25 13:40:32,875 INFO org.apache.flink.runtime.rest.RestClient
- Rest client endpoint started.
2018-07-25 13:40:32,879 INFO org.apache.flink.client.cli.CliFrontend
- Starting execution of program
2018-07-25 13:40:32,879 INFO
org.apache.flink.client.program.rest.RestClusterClient - Starting
program in interactive mode (detached: false)
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,945 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,952 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,952 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,976 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,985 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,987 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
com.fasterxml.jackson.databind.node.ObjectNode does not contain a getter
for field _children
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
com.fasterxml.jackson.databind.node.ObjectNode does not contain a setter
for field _children
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor - Class class
com.fasterxml.jackson.databind.node.ObjectNode cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2018-07-25 13:40:35,676 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:35,738 INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has
6 registered types and 0 default Kryo serializers
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:53,765 INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting
job dc8565764f025be19b89ee98c1a398f6 (detached: false).
2018-07-25 13:40:58,885 INFO org.apache.flink.runtime.rest.RestClient
- Shutting down rest endpoint.
2018-07-25 13:40:58,887 INFO org.apache.flink.runtime.rest.RestClient
- Rest endpoint shutdown complete.
2018-07-25 13:40:58,890 ERROR org.apache.flink.client.cli.CliFrontend
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at
com.sailpoint.ida.data.jobs.peergrouptransform.PeerGroupTransformJob.main(PeerGroupTransformJob.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Exception is not retryable.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Exception is not retryable.
... 10 more
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.rest.util.RestClientException: [Job submission
failed.]
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job
submission failed.]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:309)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:293)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more
Thanks, Alex
On Wed, Jul 25, 2018 at 2:22 AM Till Rohrmann <tr...@apache.org> wrote:
> Hi Alex,
>
> could you share with us the full logs of the client and the cluster
> entrypoint? That would be tremendously helpful.
>
> Cheers,
> Till
>
> On Wed, Jul 25, 2018 at 4:08 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Is it possible that the data has been corrupted?
>>
>> Or have you confirmed that the avro version is consistent in different
>> Flink versions?
>>
>> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
>> recovered?
>>
>> Thanks, vino.
>>
>>
>> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>
>>> Vino,
>>>
>>> Upgraded flink to Hadoop 2.8.1
>>>
>>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>>> entrypoint | grep 'Hadoop version'
>>> 2018-07-25T00:19:46.142+0000
>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>>> version: 2.8.1
>>>
>>> but job still fails to start
>>>
>>> Ideas?
>>>
>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>> d84cccd3bffcba1f243352a5e5ef99a9.
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ... 4 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at
>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>> ... 21 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>>> initialize task 'DataSink
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>>> Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>> ... 26 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 31 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>> at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>> at
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 32 more
>>>
>>>
>>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> Based on your log information, the potential reason is Hadoop version.
>>>> To troubleshoot the exception comes from different Hadoop version. I
>>>> suggest you match the both side of Hadoop version.
>>>>
>>>> You can :
>>>>
>>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>>> official website provides the binary binding Hadoop 2.8.[1]
>>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>>> Flink Cluster's hadoop dependency's version.
>>>>
>>>> [1]:
>>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>>
>>>> Thanks, vino.
>>>>
>>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>>> version 2.8 on flink server side? BTW job runs fine locally reading from
>>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>>
>>>>> Regarding java version. The job is submitted via Flink UI, so it
>>>>> should not be a problem.
>>>>>
>>>>> Thanks a lot in advance.
>>>>>
>>>>> 2018-07-24T12:09:38.083+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --------------------------------------------------------------------------------
>>>>> 2018-07-24T12:09:38.085+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>>> 2018-07-24T12:09:38.085+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
>>>>> user: flink
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
>>>>> Hadoop/Kerberos user: flink
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
>>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
>>>>> size: 1963 MiBytes
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
>>>>> /docker-java-home/jre
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>>>>> version: 2.7.3
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
>>>>> Arguments:
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --configDir
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> /opt/flink/conf
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --executionMode
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
>>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --------------------------------------------------------------------------------
>>>>> 2018-07-24T12:09:38.854+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
>>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>>> 2018-07-24T12:09:38.895+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>>> StandaloneSessionClusterEntrypoint.
>>>>> 2018-07-24T12:09:38.895+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>>> default filesystem.
>>>>> 2018-07-24T12:09:38.927+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>>> security context.
>>>>> 2018-07-24T12:09:39.034+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
>>>>> cluster services.
>>>>> 2018-07-24T12:09:39.059+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
>>>>> start actor system at flink-jobmanager:6123
>>>>> 2018-07-24T12:09:40.335+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
>>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>>
>>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Alex,
>>>>>>
>>>>>> I'm not entirely sure what causes this problem because it is the
>>>>>> first time I see it.
>>>>>>
>>>>>> First question would be if the problem also arises if using a
>>>>>> different Hadoop version.
>>>>>>
>>>>>> Are you using the same Java versions on the client as well as on the
>>>>>> server?
>>>>>>
>>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to
>>>>>>> Flink 1.5 and getting a weird exception.
>>>>>>>
>>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>>> access to S3AFileSystem class.
>>>>>>>
>>>>>>> Fails here
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>>> with
>>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>>
>>>>>>> To be exact it fails right on that line.
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>>
>>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>>
>>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>>> at
>>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>>> at
>>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>>> at
>>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>>> at
>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>>> ... 29 more
>>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit
>>>>>>> job 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>>> at
>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>> ... 4 more
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Could not set up JobManager
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>>> ... 21 more
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Cannot initialize task 'DataSink
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>>> Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>>> ... 26 more
>>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>>> ... 31 more
>>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>>> at
>>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>>> ... 32 more
>>>>>>>
>>>>>>>
>>>>
>>
Re: Flink 1.5 batch job fails to start
Posted by Till Rohrmann <tr...@apache.org>.
Hi Alex,
could you share with us the full logs of the client and the cluster
entrypoint? That would be tremendously helpful.
Cheers,
Till
On Wed, Jul 25, 2018 at 4:08 AM vino yang <ya...@gmail.com> wrote:
> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>
>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Alex,
>>>
>>> Based on your log information, the potential reason is Hadoop version.
>>> To troubleshoot the exception comes from different Hadoop version. I
>>> suggest you match the both side of Hadoop version.
>>>
>>> You can :
>>>
>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>> official website provides the binary binding Hadoop 2.8.[1]
>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>> Flink Cluster's hadoop dependency's version.
>>>
>>> [1]:
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>> version 2.8 on flink server side? BTW job runs fine locally reading from
>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>
>>>> Regarding java version. The job is submitted via Flink UI, so it should
>>>> not be a problem.
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>> 2018-07-24T12:09:38.083+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
>>>> user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
>>>> Hadoop/Kerberos user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
>>>> size: 1963 MiBytes
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
>>>> /docker-java-home/jre
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>>>> version: 2.7.3
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
>>>> Arguments:
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --configDir
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> /opt/flink/conf
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --executionMode
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.854+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>>>> StandaloneSessionClusterEntrypoint.
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>> default filesystem.
>>>> 2018-07-24T12:09:38.927+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>>>> security context.
>>>> 2018-07-24T12:09:39.034+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
>>>> cluster services.
>>>> 2018-07-24T12:09:39.059+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
>>>> start actor system at flink-jobmanager:6123
>>>> 2018-07-24T12:09:40.335+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>
>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> I'm not entirely sure what causes this problem because it is the first
>>>>> time I see it.
>>>>>
>>>>> First question would be if the problem also arises if using a
>>>>> different Hadoop version.
>>>>>
>>>>> Are you using the same Java versions on the client as well as on the
>>>>> server?
>>>>>
>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>>> 1.5 and getting a weird exception.
>>>>>>
>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>> access to S3AFileSystem class.
>>>>>>
>>>>>> Fails here
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>> with
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>>
>>>>>> To be exact it fails right on that line.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>
>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>
>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>> at
>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>> at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>> ... 29 more
>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>> at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>> ... 4 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Could not set up JobManager
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>> ... 21 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Cannot initialize task 'DataSink
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>> Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>> ... 26 more
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>> ... 31 more
>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>> at
>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>> ... 32 more
>>>>>>
>>>>>>
>>>
>
Re: Flink 1.5 batch job fails to start
Posted by vino yang <ya...@gmail.com>.
Hi Alex,
Is it possible that the data has been corrupted?
Or have you confirmed that the avro version is consistent in different
Flink versions?
Also, if you don't upgrade Flink and still use version 1.3.1, can it be
recovered?
Thanks, vino.
2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
> Vino,
>
> Upgraded flink to Hadoop 2.8.1
>
> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
> entrypoint | grep 'Hadoop version'
> 2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Hadoop version: 2.8.1
>
> but job still fails to start
>
> Ideas?
>
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> d84cccd3bffcba1f243352a5e5ef99a9.
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleRpcMessage(AkkaRpcActor.java:162)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(
> FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
> AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
> onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:169)
> at org.apache.flink.runtime.dispatcher.Dispatcher$
> DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
> at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(
> Dispatcher.java:287)
> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
> Dispatcher.java:277)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> persistAndRunJob(Dispatcher.java:262)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:249)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.
> HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:220)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:100)
> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
> JobMaster.java:1150)
> at org.apache.flink.runtime.jobmaster.JobMaster.
> createAndRestoreExecutionGraph(JobMaster.java:1130)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:151)
> ... 26 more
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
> initializeOnMaster(OutputFormatVertex.java:63)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:216)
> ... 31 more
> Caused by: java.lang.IllegalStateException: unread block data
> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
> ObjectInputStream.java:2781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:488)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:475)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:463)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:424)
> at org.apache.flink.runtime.operators.util.TaskConfig.
> getStubWrapper(TaskConfig.java:288)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
> initializeOnMaster(OutputFormatVertex.java:60)
> ... 32 more
>
>
> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Based on your log information, the potential reason is Hadoop version. To
>> troubleshoot the exception comes from different Hadoop version. I suggest
>> you match the both side of Hadoop version.
>>
>> You can :
>>
>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>> official website provides the binary binding Hadoop 2.8.[1]
>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>> Flink Cluster's hadoop dependency's version.
>>
>> [1]: http://www.apache.org/dyn/closer.lua/flink/flink-1.
>> 5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>
>> Thanks, vino.
>>
>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>
>>> Hi Till,
>>>
>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>> Could it be a reason for that error? If so how can i use same hadoop
>>> version 2.8 on flink server side? BTW job runs fine locally reading from
>>> the same s3a buckets when executed using createLocalEnvironment via java
>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>
>>> Regarding java version. The job is submitted via Flink UI, so it should
>>> not be a problem.
>>>
>>> Thanks a lot in advance.
>>>
>>> 2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO ------------------------------------------------------------
>>> --------------------
>>> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
>>> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
>>> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO OS current user: flink
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Current Hadoop/Kerberos user: flink
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Maximum heap size: 1963 MiBytes
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO JAVA_HOME: /docker-java-home/jre
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Hadoop version: 2.7.3
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO JVM Options:
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Xms2048m
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Xmx2048m
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
>>> disableCertChecking
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Dcom.amazonaws.sdk.disableCertChecking
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
>>> address=5015
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
>>> properties
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback-
>>> console.xml
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Program Arguments:
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO --configDir
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO /opt/flink/conf
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO --executionMode
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO cluster
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO --host
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO cluster
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Classpath: /opt/flink/lib/flink-metrics-
>>> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
>>> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
>>> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
>>> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
>>> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO ------------------------------------------------------------
>>> --------------------
>>> 2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Registered UNIX signal handlers for [TERM, HUP, INT]
>>> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Starting StandaloneSessionClusterEntrypoint.
>>> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Install default filesystem.
>>> 2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Install security context.
>>> 2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Initializing cluster services.
>>> 2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Trying to start actor system at flink-jobmanager:6123
>>> 2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO Actor system started at akka.tcp://flink@flink-jobmanager:6123
>>>
>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> I'm not entirely sure what causes this problem because it is the first
>>>> time I see it.
>>>>
>>>> First question would be if the problem also arises if using a different
>>>> Hadoop version.
>>>>
>>>> Are you using the same Java versions on the client as well as on the
>>>> server?
>>>>
>>>> Could you provide us with the cluster entrypoint logs?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>> wrote:
>>>>
>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>> 1.5 and getting a weird exception.
>>>>>
>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>> access to S3AFileSystem class.
>>>>>
>>>>> Fails here
>>>>> https://github.com/apache/flink/blob/release-1.5.0/
>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/
>>>>> operators/util/TaskConfig.java#L288
>>>>> with
>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>> (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>>
>>>>> To be exact it fails right on that line.
>>>>> https://github.com/apache/flink/blob/release-1.5.0/
>>>>> flink-core/src/main/java/org/apache/flink/util/
>>>>> InstantiationUtil.java#L488
>>>>>
>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>
>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>>>> handleRequest$3(JarRunHandler.java:141)
>>>>> at java.util.concurrent.CompletableFuture.uniExceptionally(
>>>>> CompletableFuture.java:870)
>>>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
>>>>> CompletableFuture.java:852)
>>>>> at java.util.concurrent.CompletableFuture.postComplete(
>>>>> CompletableFuture.java:474)
>>>>> at java.util.concurrent.CompletableFuture.completeExceptionally(
>>>>> CompletableFuture.java:1977)
>>>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.
>>>>> onComplete(FutureUtils.java:811)
>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>> at org.apache.flink.runtime.concurrent.Executors$
>>>>> DirectExecutionContext.execute(Executors.java:83)
>>>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
>>>>> scala:44)
>>>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
>>>>> Promise.scala:252)
>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
>>>>> BatchingExecutor.scala:55)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply$mcV$sp(BatchingExecutor.scala:91)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply(BatchingExecutor.scala:91)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply(BatchingExecutor.scala:91)
>>>>> at scala.concurrent.BlockContext$.withBlockContext(
>>>>> BlockContext.scala:72)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(
>>>>> BatchingExecutor.scala:90)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>>>> AbstractDispatcher.scala:415)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>> ForkJoinTask.java:260)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1339)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at java.util.concurrent.CompletableFuture.encodeRelay(
>>>>> CompletableFuture.java:326)
>>>>> at java.util.concurrent.CompletableFuture.completeRelay(
>>>>> CompletableFuture.java:338)
>>>>> at java.util.concurrent.CompletableFuture.uniRelay(
>>>>> CompletableFuture.java:911)
>>>>> at java.util.concurrent.CompletableFuture$UniRelay.
>>>>> tryFire(CompletableFuture.java:899)
>>>>> ... 29 more
>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> submitJob(Dispatcher.java:254)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>> NativeMethodAccessorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>>>>> AkkaRpcActor.java:247)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>>>>> handleRpcMessage(AkkaRpcActor.java:162)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>>>> handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
>>>>> AkkaRpcActor.java:142)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>>>> onReceive(FencedAkkaRpcActor.java:40)
>>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>>>>> UntypedActor.scala:165)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>> ... 4 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>> Could not set up JobManager
>>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>>>> init>(JobManagerRunner.java:169)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher$
>>>>> DefaultJobManagerRunnerFactory.createJobManagerRunner(
>>>>> Dispatcher.java:885)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> createJobManagerRunner(Dispatcher.java:287)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
>>>>> Dispatcher.java:277)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> persistAndRunJob(Dispatcher.java:262)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> submitJob(Dispatcher.java:249)
>>>>> ... 21 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>> Cannot initialize task 'DataSink (org.apache.flink.api.java.
>>>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the
>>>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:220)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:100)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
>>>>> JobMaster.java:1150)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.
>>>>> createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(
>>>>> JobMaster.java:298)
>>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>>>> init>(JobManagerRunner.java:151)
>>>>> ... 26 more
>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>> (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>>>> initializeOnMaster(OutputFormatVertex.java:63)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:216)
>>>>> ... 31 more
>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
>>>>> ObjectInputStream.java:2781)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>> at java.io.ObjectInputStream.defaultReadFields(
>>>>> ObjectInputStream.java:2285)
>>>>> at java.io.ObjectInputStream.readSerialData(
>>>>> ObjectInputStream.java:2209)
>>>>> at java.io.ObjectInputStream.readOrdinaryObject(
>>>>> ObjectInputStream.java:2067)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:488)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:475)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:463)
>>>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>>>> InstantiationUtil.java:424)
>>>>> at org.apache.flink.runtime.operators.util.TaskConfig.
>>>>> getStubWrapper(TaskConfig.java:288)
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>>>> initializeOnMaster(OutputFormatVertex.java:60)
>>>>> ... 32 more
>>>>>
>>>>>
>>
Re: Flink 1.5 batch job fails to start
Posted by Alex Vinnik <al...@gmail.com>.
Vino,
Upgraded flink to Hadoop 2.8.1
$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
version: 2.8.1
but job still fails to start
Ideas?
Caused by: org.apache.flink.util.FlinkException: Failed to submit job
d84cccd3bffcba1f243352a5e5ef99a9.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSink
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more
On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
> Hi Alex,
>
> Based on your log information, the potential reason is Hadoop version. To
> troubleshoot the exception comes from different Hadoop version. I suggest
> you match the both side of Hadoop version.
>
> You can :
>
> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
> official website provides the binary binding Hadoop 2.8.[1]
> 2. downgrade your fat jar's Hadoop client dependency's version to match
> Flink Cluster's hadoop dependency's version.
>
> [1]:
> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>
> Thanks, vino.
>
> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Hi Till,
>>
>> Thanks for responding. Below is entrypoint logs. One thing I noticed that
>> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
>> it be a reason for that error? If so how can i use same hadoop version 2.8
>> on flink server side? BTW job runs fine locally reading from the same s3a
>> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
>> --input s3a://foo --output s3a://bar
>>
>> Regarding java version. The job is submitted via Flink UI, so it should
>> not be a problem.
>>
>> Thanks a lot in advance.
>>
>> 2018-07-24T12:09:38.083+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --------------------------------------------------------------------------------
>> 2018-07-24T12:09:38.085+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>> Date:24.05.2018 @ 14:54:44 UTC)
>> 2018-07-24T12:09:38.085+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
>> user: flink
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
>> Hadoop/Kerberos user: flink
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
>> size: 1963 MiBytes
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
>> /docker-java-home/jre
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
>> version: 2.7.3
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dcom.amazonaws.sdk.disableCertChecking
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
>> Arguments:
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --configDir
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> /opt/flink/conf
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --executionMode
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --------------------------------------------------------------------------------
>> 2018-07-24T12:09:38.854+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
>> UNIX signal handlers for [TERM, HUP, INT]
>> 2018-07-24T12:09:38.895+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
>> StandaloneSessionClusterEntrypoint.
>> 2018-07-24T12:09:38.895+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>> default filesystem.
>> 2018-07-24T12:09:38.927+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
>> security context.
>> 2018-07-24T12:09:39.034+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
>> cluster services.
>> 2018-07-24T12:09:39.059+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
>> start actor system at flink-jobmanager:6123
>> 2018-07-24T12:09:40.335+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
>> started at akka.tcp://flink@flink-jobmanager:6123
>>
>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Alex,
>>>
>>> I'm not entirely sure what causes this problem because it is the first
>>> time I see it.
>>>
>>> First question would be if the problem also arises if using a different
>>> Hadoop version.
>>>
>>> Are you using the same Java versions on the client as well as on the
>>> server?
>>>
>>> Could you provide us with the cluster entrypoint logs?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>> wrote:
>>>
>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>> 1.5 and getting a weird exception.
>>>>
>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>> access to S3AFileSystem class.
>>>>
>>>> Fails here
>>>>
>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>> with
>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>>
>>>> To be exact it fails right on that line.
>>>>
>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>
>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>> know if more info is needed. Full stack is below. Thanks.
>>>>
>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>> at
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.util.concurrent.CompletionException:
>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>> ... 29 more
>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>> at
>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>> ... 4 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>>> not set up JobManager
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>> ... 21 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>> Cannot initialize task 'DataSink
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>> Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>> ... 26 more
>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>> at
>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>> ... 31 more
>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>> at
>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>> at
>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>> ... 32 more
>>>>
>>>>
>
Re: Flink 1.5 batch job fails to start
Posted by vino yang <ya...@gmail.com>.
Hi Alex,
Based on your log information, the potential reason is Hadoop version. To
troubleshoot the exception comes from different Hadoop version. I suggest
you match the both side of Hadoop version.
You can :
1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match
Flink Cluster's hadoop dependency's version.
[1]:
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
Thanks, vino.
2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
> Hi Till,
>
> Thanks for responding. Below is entrypoint logs. One thing I noticed that
> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
> it be a reason for that error? If so how can i use same hadoop version 2.8
> on flink server side? BTW job runs fine locally reading from the same s3a
> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
> --input s3a://foo --output s3a://bar
>
> Regarding java version. The job is submitted via Flink UI, so it should
> not be a problem.
>
> Thanks a lot in advance.
>
> 2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO OS current user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Current Hadoop/Kerberos user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Maximum heap size: 1963 MiBytes
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO JAVA_HOME: /docker-java-home/jre
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Hadoop version: 2.7.3
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO JVM Options:
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Xms2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Xmx2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
> disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Dcom.amazonaws.sdk.disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
> address=5015
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
> properties
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback-
> console.xml
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Program Arguments:
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO --configDir
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO /opt/flink/conf
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO --executionMode
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO --host
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Classpath: /opt/flink/lib/flink-metrics-
> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Registered UNIX signal handlers for [TERM, HUP, INT]
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Starting StandaloneSessionClusterEntrypoint.
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Install default filesystem.
> 2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Install security context.
> 2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Initializing cluster services.
> 2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Trying to start actor system at flink-jobmanager:6123
> 2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO Actor system started at akka.tcp://flink@flink-jobmanager:6123
>
> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Alex,
>>
>> I'm not entirely sure what causes this problem because it is the first
>> time I see it.
>>
>> First question would be if the problem also arises if using a different
>> Hadoop version.
>>
>> Are you using the same Java versions on the client as well as on the
>> server?
>>
>> Could you provide us with the cluster entrypoint logs?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:
>>
>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>> 1.5 and getting a weird exception.
>>>
>>> Job reads json from s3a and writes parquet files to s3a with avro model.
>>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
>>> S3AFileSystem class.
>>>
>>> Fails here
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-runtime/src/main/java/org/apache/flink/runtime/
>>> operators/util/TaskConfig.java#L288
>>> with
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>>
>>> To be exact it fails right on that line.
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-core/src/main/java/org/apache/flink/util/
>>> InstantiationUtil.java#L488
>>>
>>> Not sure how to resolve this problem. Looking for an advice. Let me know
>>> if more info is needed. Full stack is below. Thanks.
>>>
>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>> handleRequest$3(JarRunHandler.java:141)
>>> at java.util.concurrent.CompletableFuture.uniExceptionally(
>>> CompletableFuture.java:870)
>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
>>> CompletableFuture.java:852)
>>> at java.util.concurrent.CompletableFuture.postComplete(
>>> CompletableFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally(
>>> CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.
>>> onComplete(FutureUtils.java:811)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.
>>> execute(Executors.java:83)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
>>> scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
>>> Promise.scala:252)
>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
>>> BatchingExecutor.scala:55)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply$mcV$sp(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at scala.concurrent.BlockContext$.withBlockContext(
>>> BlockContext.scala:72)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(
>>> BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>> AbstractDispatcher.scala:415)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>> ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.CompletionException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at java.util.concurrent.CompletableFuture.encodeRelay(
>>> CompletableFuture.java:326)
>>> at java.util.concurrent.CompletableFuture.completeRelay(
>>> CompletableFuture.java:338)
>>> at java.util.concurrent.CompletableFuture.uniRelay(
>>> CompletableFuture.java:911)
>>> at java.util.concurrent.CompletableFuture$UniRelay.
>>> tryFire(CompletableFuture.java:899)
>>> ... 29 more
>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:254)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>>> AkkaRpcActor.java:247)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>>> handleRpcMessage(AkkaRpcActor.java:162)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
>>> AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>>> UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ... 4 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:169)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher$
>>> DefaultJobManagerRunnerFactory.createJobManagerRunner(
>>> Dispatcher.java:885)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> createJobManagerRunner(Dispatcher.java:287)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
>>> Dispatcher.java:277)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> persistAndRunJob(Dispatcher.java:262)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:249)
>>> ... 21 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Cannot initialize task 'DataSink (org.apache.flink.api.java.
>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the
>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.
>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:220)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:100)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
>>> JobMaster.java:1150)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.
>>> createAndRestoreExecutionGraph(JobMaster.java:1130)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(
>>> JobMaster.java:298)
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:151)
>>> ... 26 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:63)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 31 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
>>> ObjectInputStream.java:2781)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>> at java.io.ObjectInputStream.defaultReadFields(
>>> ObjectInputStream.java:2285)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>> at java.io.ObjectInputStream.readOrdinaryObject(
>>> ObjectInputStream.java:2067)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:488)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:475)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:463)
>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>> InstantiationUtil.java:424)
>>> at org.apache.flink.runtime.operators.util.TaskConfig.
>>> getStubWrapper(TaskConfig.java:288)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 32 more
>>>
>>>
Re: Flink 1.5 batch job fails to start
Posted by Alex Vinnik <al...@gmail.com>.
Hi Till,
Thanks for responding. Below is entrypoint logs. One thing I noticed that
"Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
it be a reason for that error? If so how can i use same hadoop version 2.8
on flink server side? BTW job runs fine locally reading from the same s3a
buckets when executed using createLocalEnvironment via java -jar my-fat.jar
--input s3a://foo --output s3a://bar
Regarding java version. The job is submitted via Flink UI, so it should not
be a problem.
Thanks a lot in advance.
2018-07-24T12:09:38.083+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current
user: flink
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current
Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap
size: 1963 MiBytes
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME:
/docker-java-home/jre
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop
version: 2.7.3
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options:
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program
Arguments:
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting
StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
default filesystem.
2018-07-24T12:09:38.927+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install
security context.
2018-07-24T12:09:39.034+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing
cluster services.
2018-07-24T12:09:39.059+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to
start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system
started at akka.tcp://flink@flink-jobmanager:6123
On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org> wrote:
> Hi Alex,
>
> I'm not entirely sure what causes this problem because it is the first
> time I see it.
>
> First question would be if the problem also arises if using a different
> Hadoop version.
>
> Are you using the same Java versions on the client as well as on the
> server?
>
> Could you provide us with the cluster entrypoint logs?
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:
>
>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
>> and getting a weird exception.
>>
>> Job reads json from s3a and writes parquet files to s3a with avro model.
>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
>> S3AFileSystem class.
>>
>> Fails here
>>
>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>> with
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>>
>> To be exact it fails right on that line.
>>
>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>
>> Not sure how to resolve this problem. Looking for an advice. Let me know
>> if more info is needed. Full stack is below. Thanks.
>>
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>> at
>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>> at
>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>> at
>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>> ... 29 more
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>
Re: Flink 1.5 batch job fails to start
Posted by Till Rohrmann <tr...@apache.org>.
Hi Alex,
I'm not entirely sure what causes this problem because it is the first time
I see it.
First question would be if the problem also arises if using a different
Hadoop version.
Are you using the same Java versions on the client as well as on the server?
Could you provide us with the cluster entrypoint logs?
Cheers,
Till
On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:
> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
> and getting a weird exception.
>
> Job reads json from s3a and writes parquet files to s3a with avro model.
> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
> S3AFileSystem class.
>
> Fails here
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
> with
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
>
> To be exact it fails right on that line.
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>
> Not sure how to resolve this problem. Looking for an advice. Let me know
> if more info is needed. Full stack is below. Thanks.
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 29 more
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
> Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
> ... 26 more
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
> at
> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
> ... 31 more
> Caused by: java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> at
> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
> ... 32 more
>
>