You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Vinnik <al...@gmail.com> on 2018/07/24 02:56:26 UTC

Flink 1.5 batch job fails to start

Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model.
Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
S3AFileSystem class.

Fails here
https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
with
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data

To be exact it fails right on that line.
https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488

Not sure how to resolve this problem. Looking for an advice. Let me know if
more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException:
org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job
13a1478cbc7ec20f93f9ee0947856bfd.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSink
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
failed: unread block data
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more

Re: Flink 1.5 batch job fails to start

Posted by Alex Vinnik <al...@gmail.com>.
Hi Vino,

Data is ok i double checked. Input is plain json and it can be processed by
same code compiled and run on 1.3.1 flink. Thanks for the hint about avro
and parquet versions. Got my fat jar synced up with flink 1.5.1
avro/parguet versions. Hope was high that it will help to resolve the
problem. And one run of the job actually was successful., but it started
failing after that with the same problem. Weird. Will continue to poke
around, feels I am so close :)

Best,
-Alex

On Tue, Jul 24, 2018 at 9:08 PM vino yang <ya...@gmail.com> wrote:

> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>
>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Alex,
>>>
>>> Based on your log information, the potential reason is Hadoop version.
>>> To troubleshoot the exception comes from different Hadoop version. I
>>> suggest you match the both side of Hadoop version.
>>>
>>> You can :
>>>
>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>> official website provides the binary binding Hadoop 2.8.[1]
>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>> Flink Cluster's hadoop dependency's version.
>>>
>>> [1]:
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>> version 2.8 on flink server side?  BTW job runs fine locally reading from
>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>
>>>> Regarding java version. The job is submitted via Flink UI, so it should
>>>> not be a problem.
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>> 2018-07-24T12:09:38.083+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
>>>> user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
>>>> Hadoop/Kerberos user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
>>>> size: 1963 MiBytes
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
>>>> /docker-java-home/jre
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>>>> version: 2.7.3
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
>>>> Arguments:
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --configDir
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> /opt/flink/conf
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --executionMode
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.854+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
>>>> StandaloneSessionClusterEntrypoint.
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>> default filesystem.
>>>> 2018-07-24T12:09:38.927+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>> security context.
>>>> 2018-07-24T12:09:39.034+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
>>>> cluster services.
>>>> 2018-07-24T12:09:39.059+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
>>>> start actor system at flink-jobmanager:6123
>>>> 2018-07-24T12:09:40.335+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>
>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> I'm not entirely sure what causes this problem because it is the first
>>>>> time I see it.
>>>>>
>>>>> First question would be if the problem also arises if using a
>>>>> different Hadoop version.
>>>>>
>>>>> Are you using the same Java versions on the client as well as on the
>>>>> server?
>>>>>
>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>>> 1.5 and getting a weird exception.
>>>>>>
>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>> access to S3AFileSystem class.
>>>>>>
>>>>>> Fails here
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>> with
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>>
>>>>>> To be exact it fails right on that line.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>
>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>
>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>> at
>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>> at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>> ... 29 more
>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>> at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>> ... 4 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Could not set up JobManager
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>> ... 21 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Cannot initialize task 'DataSink
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>> Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>> ... 26 more
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>> ... 31 more
>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>> at
>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>> ... 32 more
>>>>>>
>>>>>>
>>>
>

Re: Flink 1.5 batch job fails to start

Posted by Alex Vinnik <al...@gmail.com>.
Hi Till,

Server start up entrypoint log

2018-07-25T12:19:12.268+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-25T12:19:12.271+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:3488f8b,
Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25T12:19:12.271+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
user: flink
2018-07-25T12:19:18.599+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
Hadoop/Kerberos user: flink
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
size: 1963 MiBytes
2018-07-25T12:19:18.607+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
/docker-java-home/jre
2018-07-25T12:19:18.615+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.8.1
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
Arguments:
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25T12:19:18.616+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-25T12:19:18.620+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-25T12:19:18.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
StandaloneSessionClusterEntrypoint.
2018-07-25T12:19:18.854+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
default filesystem.
2018-07-25T12:19:19.045+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
security context.
2018-07-25T12:19:19.520+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
cluster services.
2018-07-25T12:19:19.601+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
start actor system at flink-jobmanager:6123
2018-07-25T12:19:24.768+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
started at akka.tcp://flink@flink-jobmanager:6123

Below flink client log file
docker exec -it flink-jobmanager flink run /fat.jar --input-path
s3a://json-input --output-path s3a://parquet-output

2018-07-25 13:40:30,752 INFO  org.apache.flink.client.cli.CliFrontend
                 -
--------------------------------------------------------------------------------
2018-07-25 13:40:30,756 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Starting Command Line Client (Version: <unknown>,
Rev:3488f8b, Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25 13:40:30,756 INFO  org.apache.flink.client.cli.CliFrontend
                 -  OS current user: root
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Current Hadoop/Kerberos user: root
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend
                 -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
1.8/25.171-b11
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Maximum heap size: 2667 MiBytes
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend
                 -  JAVA_HOME: /docker-java-home/jre
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Hadoop version: 2.8.1
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend
                 -  JVM Options:
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend
                 -
 -Dlog.file=/opt/flink/log/flink--client-d9831b2552d5.log
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend
                 -
 -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend
                 -
 -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Program Arguments:
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     run
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     /peer-group-transform-all.jar
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     --input-path
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     s3a://odin-tmp/explore-data/20180711104638/
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     --output-path
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -     s3a://odin-tmp/transformed/20180712/IDA-1917-1
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -  Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend
                 -
--------------------------------------------------------------------------------
2018-07-25 13:40:31,806 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:31,807 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:31,808 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:31,809 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:31,809 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,151 INFO
org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user
set to root (auth:SIMPLE)
2018-07-25 13:40:32,190 INFO  org.apache.flink.client.cli.CliFrontend
                 - Running 'run' command.
2018-07-25 13:40:32,197 INFO  org.apache.flink.client.cli.CliFrontend
                 - Building program from JAR file
2018-07-25 13:40:32,363 WARN  org.apache.flink.configuration.Configuration
                - Config uses deprecated configuration key
'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-07-25 13:40:32,875 INFO  org.apache.flink.runtime.rest.RestClient
                - Rest client endpoint started.
2018-07-25 13:40:32,879 INFO  org.apache.flink.client.cli.CliFrontend
                 - Starting execution of program
2018-07-25 13:40:32,879 INFO
org.apache.flink.client.program.rest.RestClusterClient        - Starting
program in interactive mode (detached: false)
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,943 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,944 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,945 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,950 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,951 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,952 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,952 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:32,976 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,985 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,986 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:32,987 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
com.fasterxml.jackson.databind.node.ObjectNode does not contain a getter
for field _children
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - class
com.fasterxml.jackson.databind.node.ObjectNode does not contain a setter
for field _children
2018-07-25 13:40:33,212 INFO
org.apache.flink.api.java.typeutils.TypeExtractor             - Class class
com.fasterxml.jackson.databind.node.ObjectNode cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
2018-07-25 13:40:35,676 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:35,677 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:35,678 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:35,738 INFO
org.apache.flink.api.java.ExecutionEnvironment                - The job has
6 registered types and 0 default Kryo serializers
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 1
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: rest.port, 8081
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:53,672 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: env.java.opts,
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: blob.server.port, 6124
2018-07-25 13:40:53,673 INFO
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: query.server.port, 6125
2018-07-25 13:40:53,765 INFO
org.apache.flink.client.program.rest.RestClusterClient        - Submitting
job dc8565764f025be19b89ee98c1a398f6 (detached: false).
2018-07-25 13:40:58,885 INFO  org.apache.flink.runtime.rest.RestClient
                - Shutting down rest endpoint.
2018-07-25 13:40:58,887 INFO  org.apache.flink.runtime.rest.RestClient
                - Rest endpoint shutdown complete.
2018-07-25 13:40:58,890 ERROR org.apache.flink.client.cli.CliFrontend
                 - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at
com.sailpoint.ida.data.jobs.peergrouptransform.PeerGroupTransformJob.main(PeerGroupTransformJob.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Exception is not retryable.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Exception is not retryable.
... 10 more
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.rest.util.RestClientException: [Job submission
failed.]
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job
submission failed.]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:309)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:293)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more

Thanks, Alex


On Wed, Jul 25, 2018 at 2:22 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Alex,
>
> could you share with us the full logs of the client and the cluster
> entrypoint? That would be tremendously helpful.
>
> Cheers,
> Till
>
> On Wed, Jul 25, 2018 at 4:08 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Is it possible that the data has been corrupted?
>>
>> Or have you confirmed that the avro version is consistent in different
>> Flink versions?
>>
>> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
>> recovered?
>>
>> Thanks, vino.
>>
>>
>> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>
>>> Vino,
>>>
>>> Upgraded flink to Hadoop 2.8.1
>>>
>>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>>> entrypoint | grep 'Hadoop version'
>>> 2018-07-25T00:19:46.142+0000
>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>>> version: 2.8.1
>>>
>>> but job still fails to start
>>>
>>> Ideas?
>>>
>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>> d84cccd3bffcba1f243352a5e5ef99a9.
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ... 4 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at
>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>> at
>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>> ... 21 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>>> initialize task 'DataSink
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>>> Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>> ... 26 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>>> failed: unread block data
>>> at
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 31 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>> at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>> at
>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 32 more
>>>
>>>
>>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> Based on your log information, the potential reason is Hadoop version.
>>>> To troubleshoot the exception comes from different Hadoop version. I
>>>> suggest you match the both side of Hadoop version.
>>>>
>>>> You can :
>>>>
>>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>>> official website provides the binary binding Hadoop 2.8.[1]
>>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>>> Flink Cluster's hadoop dependency's version.
>>>>
>>>> [1]:
>>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>>
>>>> Thanks, vino.
>>>>
>>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>>> version 2.8 on flink server side?  BTW job runs fine locally reading from
>>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>>
>>>>> Regarding java version. The job is submitted via Flink UI, so it
>>>>> should not be a problem.
>>>>>
>>>>> Thanks a lot in advance.
>>>>>
>>>>> 2018-07-24T12:09:38.083+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --------------------------------------------------------------------------------
>>>>> 2018-07-24T12:09:38.085+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
>>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>>> 2018-07-24T12:09:38.085+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
>>>>> user: flink
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
>>>>> Hadoop/Kerberos user: flink
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
>>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
>>>>> size: 1963 MiBytes
>>>>> 2018-07-24T12:09:38.844+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
>>>>> /docker-java-home/jre
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>>>>> version: 2.7.3
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>>> 2018-07-24T12:09:38.851+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
>>>>> Arguments:
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --configDir
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> /opt/flink/conf
>>>>> 2018-07-24T12:09:38.852+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --executionMode
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
>>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>>> 2018-07-24T12:09:38.853+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>>> --------------------------------------------------------------------------------
>>>>> 2018-07-24T12:09:38.854+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
>>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>>> 2018-07-24T12:09:38.895+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
>>>>> StandaloneSessionClusterEntrypoint.
>>>>> 2018-07-24T12:09:38.895+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>>> default filesystem.
>>>>> 2018-07-24T12:09:38.927+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>>> security context.
>>>>> 2018-07-24T12:09:39.034+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
>>>>> cluster services.
>>>>> 2018-07-24T12:09:39.059+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
>>>>> start actor system at flink-jobmanager:6123
>>>>> 2018-07-24T12:09:40.335+0000
>>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
>>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>>
>>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Alex,
>>>>>>
>>>>>> I'm not entirely sure what causes this problem because it is the
>>>>>> first time I see it.
>>>>>>
>>>>>> First question would be if the problem also arises if using a
>>>>>> different Hadoop version.
>>>>>>
>>>>>> Are you using the same Java versions on the client as well as on the
>>>>>> server?
>>>>>>
>>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to
>>>>>>> Flink 1.5 and getting a weird exception.
>>>>>>>
>>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>>> access to S3AFileSystem class.
>>>>>>>
>>>>>>> Fails here
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>>> with
>>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>>
>>>>>>> To be exact it fails right on that line.
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>>
>>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>>
>>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>>> at
>>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>>> at
>>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>>> at
>>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>>> at
>>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>>> at
>>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>>> at
>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>>> at
>>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>>> ... 29 more
>>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit
>>>>>>> job 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>>> at
>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>> ... 4 more
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Could not set up JobManager
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>>> ... 21 more
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Cannot initialize task 'DataSink
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>>> Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>>> ... 26 more
>>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>>> failed: unread block data
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>>> ... 31 more
>>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>>> at
>>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>>> at
>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>>> at
>>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>>> ... 32 more
>>>>>>>
>>>>>>>
>>>>
>>

Re: Flink 1.5 batch job fails to start

Posted by Till Rohrmann <tr...@apache.org>.
Hi Alex,

could you share with us the full logs of the client and the cluster
entrypoint? That would be tremendously helpful.

Cheers,
Till

On Wed, Jul 25, 2018 at 4:08 AM vino yang <ya...@gmail.com> wrote:

> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>
>> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Alex,
>>>
>>> Based on your log information, the potential reason is Hadoop version.
>>> To troubleshoot the exception comes from different Hadoop version. I
>>> suggest you match the both side of Hadoop version.
>>>
>>> You can :
>>>
>>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>>> official website provides the binary binding Hadoop 2.8.[1]
>>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>>> Flink Cluster's hadoop dependency's version.
>>>
>>> [1]:
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>>> Could it be a reason for that error? If so how can i use same hadoop
>>>> version 2.8 on flink server side?  BTW job runs fine locally reading from
>>>> the same s3a buckets when executed using createLocalEnvironment via java
>>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>>
>>>> Regarding java version. The job is submitted via Flink UI, so it should
>>>> not be a problem.
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>> 2018-07-24T12:09:38.083+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
>>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>>>> Date:24.05.2018 @ 14:54:44 UTC)
>>>> 2018-07-24T12:09:38.085+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
>>>> user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
>>>> Hadoop/Kerberos user: flink
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
>>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
>>>> size: 1963 MiBytes
>>>> 2018-07-24T12:09:38.844+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
>>>> /docker-java-home/jre
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>>>> version: 2.7.3
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dcom.amazonaws.sdk.disableCertChecking
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>>> 2018-07-24T12:09:38.851+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
>>>> Arguments:
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --configDir
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> /opt/flink/conf
>>>> 2018-07-24T12:09:38.852+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --executionMode
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
>>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>>> 2018-07-24T12:09:38.853+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>>>> --------------------------------------------------------------------------------
>>>> 2018-07-24T12:09:38.854+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
>>>> UNIX signal handlers for [TERM, HUP, INT]
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
>>>> StandaloneSessionClusterEntrypoint.
>>>> 2018-07-24T12:09:38.895+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>> default filesystem.
>>>> 2018-07-24T12:09:38.927+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>>>> security context.
>>>> 2018-07-24T12:09:39.034+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
>>>> cluster services.
>>>> 2018-07-24T12:09:39.059+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
>>>> start actor system at flink-jobmanager:6123
>>>> 2018-07-24T12:09:40.335+0000
>>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
>>>> started at akka.tcp://flink@flink-jobmanager:6123
>>>>
>>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> I'm not entirely sure what causes this problem because it is the first
>>>>> time I see it.
>>>>>
>>>>> First question would be if the problem also arises if using a
>>>>> different Hadoop version.
>>>>>
>>>>> Are you using the same Java versions on the client as well as on the
>>>>> server?
>>>>>
>>>>> Could you provide us with the cluster entrypoint logs?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>>> 1.5 and getting a weird exception.
>>>>>>
>>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>>> access to S3AFileSystem class.
>>>>>>
>>>>>> Fails here
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>>>> with
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>>
>>>>>> To be exact it fails right on that line.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>>>
>>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>>
>>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>>>> at
>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>>> at
>>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>>>> at
>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>> at
>>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>>> at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>>>> ... 29 more
>>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>> at
>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>> at
>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>> ... 4 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Could not set up JobManager
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>>>> at
>>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>>>> ... 21 more
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>> Cannot initialize task 'DataSink
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>>>> Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>>>> ... 26 more
>>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>>>> failed: unread block data
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>>>> ... 31 more
>>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>>> at
>>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>>> at
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>>>> at
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>>>> at
>>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>>>> at
>>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>>>> ... 32 more
>>>>>>
>>>>>>
>>>
>

Re: Flink 1.5 batch job fails to start

Posted by vino yang <ya...@gmail.com>.
Hi Alex,

Is it possible that the data has been corrupted?

Or have you confirmed that the avro version is consistent in different
Flink versions?

Also, if you don't upgrade Flink and still use version 1.3.1, can it be
recovered?

Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik <al...@gmail.com>:

> Vino,
>
> Upgraded flink to Hadoop 2.8.1
>
> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
> entrypoint | grep 'Hadoop version'
> 2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Hadoop version: 2.8.1
>
> but job still fails to start
>
> Ideas?
>
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> d84cccd3bffcba1f243352a5e5ef99a9.
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleRpcMessage(AkkaRpcActor.java:162)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(
> FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
> AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
> onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:169)
> at org.apache.flink.runtime.dispatcher.Dispatcher$
> DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
> at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(
> Dispatcher.java:287)
> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
> Dispatcher.java:277)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> persistAndRunJob(Dispatcher.java:262)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:249)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.
> HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:220)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:100)
> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
> JobMaster.java:1150)
> at org.apache.flink.runtime.jobmaster.JobMaster.
> createAndRestoreExecutionGraph(JobMaster.java:1130)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:151)
> ... 26 more
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
> initializeOnMaster(OutputFormatVertex.java:63)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:216)
> ... 31 more
> Caused by: java.lang.IllegalStateException: unread block data
> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
> ObjectInputStream.java:2781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:488)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:475)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:463)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:424)
> at org.apache.flink.runtime.operators.util.TaskConfig.
> getStubWrapper(TaskConfig.java:288)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
> initializeOnMaster(OutputFormatVertex.java:60)
> ... 32 more
>
>
> On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Based on your log information, the potential reason is Hadoop version. To
>> troubleshoot the exception comes from different Hadoop version. I suggest
>> you match the both side of Hadoop version.
>>
>> You can :
>>
>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
>> official website provides the binary binding Hadoop 2.8.[1]
>> 2. downgrade your fat jar's Hadoop client dependency's version to match
>> Flink Cluster's hadoop dependency's version.
>>
>> [1]: http://www.apache.org/dyn/closer.lua/flink/flink-1.
>> 5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>>
>> Thanks, vino.
>>
>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>>
>>> Hi Till,
>>>
>>> Thanks for responding. Below is entrypoint logs. One thing I noticed
>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client.
>>> Could it be a reason for that error? If so how can i use same hadoop
>>> version 2.8 on flink server side?  BTW job runs fine locally reading from
>>> the same s3a buckets when executed using createLocalEnvironment via java
>>> -jar my-fat.jar --input s3a://foo --output s3a://bar
>>>
>>> Regarding java version. The job is submitted via Flink UI, so it should
>>> not be a problem.
>>>
>>> Thanks a lot in advance.
>>>
>>> 2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  ------------------------------------------------------------
>>> --------------------
>>> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
>>> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
>>> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   OS current user: flink
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Current Hadoop/Kerberos user: flink
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Maximum heap size: 1963 MiBytes
>>> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   JAVA_HOME: /docker-java-home/jre
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Hadoop version: 2.7.3
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   JVM Options:
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Xms2048m
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Xmx2048m
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
>>> disableCertChecking
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Dcom.amazonaws.sdk.disableCertChecking
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
>>> address=5015
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
>>> properties
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-
>>> console.xml
>>> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Program Arguments:
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      --configDir
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      /opt/flink/conf
>>> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      --executionMode
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      cluster
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      --host
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO      cluster
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO   Classpath: /opt/flink/lib/flink-metrics-
>>> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
>>> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
>>> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
>>> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
>>> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>>> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  ------------------------------------------------------------
>>> --------------------
>>> 2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
>>> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Starting StandaloneSessionClusterEntrypoint.
>>> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Install default filesystem.
>>> 2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Install security context.
>>> 2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Initializing cluster services.
>>> 2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Trying to start actor system at flink-jobmanager:6123
>>> 2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
>>> INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123
>>>
>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> I'm not entirely sure what causes this problem because it is the first
>>>> time I see it.
>>>>
>>>> First question would be if the problem also arises if using a different
>>>> Hadoop version.
>>>>
>>>> Are you using the same Java versions on the client as well as on the
>>>> server?
>>>>
>>>> Could you provide us with the cluster entrypoint logs?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>>> wrote:
>>>>
>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>>> 1.5 and getting a weird exception.
>>>>>
>>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>>> access to S3AFileSystem class.
>>>>>
>>>>> Fails here
>>>>> https://github.com/apache/flink/blob/release-1.5.0/
>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/
>>>>> operators/util/TaskConfig.java#L288
>>>>> with
>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>> (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>>
>>>>> To be exact it fails right on that line.
>>>>> https://github.com/apache/flink/blob/release-1.5.0/
>>>>> flink-core/src/main/java/org/apache/flink/util/
>>>>> InstantiationUtil.java#L488
>>>>>
>>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>>> know if more info is needed. Full stack is below. Thanks.
>>>>>
>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>>>> handleRequest$3(JarRunHandler.java:141)
>>>>> at java.util.concurrent.CompletableFuture.uniExceptionally(
>>>>> CompletableFuture.java:870)
>>>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
>>>>> CompletableFuture.java:852)
>>>>> at java.util.concurrent.CompletableFuture.postComplete(
>>>>> CompletableFuture.java:474)
>>>>> at java.util.concurrent.CompletableFuture.completeExceptionally(
>>>>> CompletableFuture.java:1977)
>>>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.
>>>>> onComplete(FutureUtils.java:811)
>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>> at org.apache.flink.runtime.concurrent.Executors$
>>>>> DirectExecutionContext.execute(Executors.java:83)
>>>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
>>>>> scala:44)
>>>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
>>>>> Promise.scala:252)
>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
>>>>> BatchingExecutor.scala:55)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply$mcV$sp(BatchingExecutor.scala:91)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply(BatchingExecutor.scala:91)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>>>> apply(BatchingExecutor.scala:91)
>>>>> at scala.concurrent.BlockContext$.withBlockContext(
>>>>> BlockContext.scala:72)
>>>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(
>>>>> BatchingExecutor.scala:90)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>>>> AbstractDispatcher.scala:415)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>>>> ForkJoinTask.java:260)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1339)
>>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at java.util.concurrent.CompletableFuture.encodeRelay(
>>>>> CompletableFuture.java:326)
>>>>> at java.util.concurrent.CompletableFuture.completeRelay(
>>>>> CompletableFuture.java:338)
>>>>> at java.util.concurrent.CompletableFuture.uniRelay(
>>>>> CompletableFuture.java:911)
>>>>> at java.util.concurrent.CompletableFuture$UniRelay.
>>>>> tryFire(CompletableFuture.java:899)
>>>>> ... 29 more
>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> submitJob(Dispatcher.java:254)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>> NativeMethodAccessorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>>>>> AkkaRpcActor.java:247)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>>>>> handleRpcMessage(AkkaRpcActor.java:162)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>>>> handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
>>>>> AkkaRpcActor.java:142)
>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>>>> onReceive(FencedAkkaRpcActor.java:40)
>>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>>>>> UntypedActor.scala:165)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>> ... 4 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>> Could not set up JobManager
>>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>>>> init>(JobManagerRunner.java:169)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher$
>>>>> DefaultJobManagerRunnerFactory.createJobManagerRunner(
>>>>> Dispatcher.java:885)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> createJobManagerRunner(Dispatcher.java:287)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
>>>>> Dispatcher.java:277)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> persistAndRunJob(Dispatcher.java:262)
>>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>>>> submitJob(Dispatcher.java:249)
>>>>> ... 21 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>> Cannot initialize task 'DataSink (org.apache.flink.api.java.
>>>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the
>>>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:220)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:100)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
>>>>> JobMaster.java:1150)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.
>>>>> createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(
>>>>> JobMaster.java:298)
>>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>>>> init>(JobManagerRunner.java:151)
>>>>> ... 26 more
>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>>> (org.apache.flink.api.java.hadoop.mapreduce.
>>>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>>>> initializeOnMaster(OutputFormatVertex.java:63)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>>>> buildGraph(ExecutionGraphBuilder.java:216)
>>>>> ... 31 more
>>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
>>>>> ObjectInputStream.java:2781)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>>> at java.io.ObjectInputStream.defaultReadFields(
>>>>> ObjectInputStream.java:2285)
>>>>> at java.io.ObjectInputStream.readSerialData(
>>>>> ObjectInputStream.java:2209)
>>>>> at java.io.ObjectInputStream.readOrdinaryObject(
>>>>> ObjectInputStream.java:2067)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:488)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:475)
>>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>>>> InstantiationUtil.java:463)
>>>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>>>> InstantiationUtil.java:424)
>>>>> at org.apache.flink.runtime.operators.util.TaskConfig.
>>>>> getStubWrapper(TaskConfig.java:288)
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>>>> initializeOnMaster(OutputFormatVertex.java:60)
>>>>> ... 32 more
>>>>>
>>>>>
>>

Re: Flink 1.5 batch job fails to start

Posted by Alex Vinnik <al...@gmail.com>.
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job
d84cccd3bffcba1f243352a5e5ef99a9.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSink
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <ya...@gmail.com> wrote:

> Hi Alex,
>
> Based on your log information, the potential reason is Hadoop version. To
> troubleshoot the exception comes from different Hadoop version. I suggest
> you match the both side of Hadoop version.
>
> You can :
>
> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
> official website provides the binary binding Hadoop 2.8.[1]
> 2. downgrade your fat jar's Hadoop client dependency's version to match
> Flink Cluster's hadoop dependency's version.
>
> [1]:
> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz
>
> Thanks, vino.
>
> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:
>
>> Hi Till,
>>
>> Thanks for responding. Below is entrypoint logs. One thing I noticed that
>> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
>> it be a reason for that error? If so how can i use same hadoop version 2.8
>> on flink server side?  BTW job runs fine locally reading from the same s3a
>> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
>> --input s3a://foo --output s3a://bar
>>
>> Regarding java version. The job is submitted via Flink UI, so it should
>> not be a problem.
>>
>> Thanks a lot in advance.
>>
>> 2018-07-24T12:09:38.083+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --------------------------------------------------------------------------------
>> 2018-07-24T12:09:38.085+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
>> Date:24.05.2018 @ 14:54:44 UTC)
>> 2018-07-24T12:09:38.085+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
>> user: flink
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
>> Hadoop/Kerberos user: flink
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
>> size: 1963 MiBytes
>> 2018-07-24T12:09:38.844+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
>> /docker-java-home/jre
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>> version: 2.7.3
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dcom.amazonaws.sdk.disableCertChecking
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> 2018-07-24T12:09:38.851+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
>> Arguments:
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --configDir
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> /opt/flink/conf
>> 2018-07-24T12:09:38.852+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --executionMode
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
>> 2018-07-24T12:09:38.853+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
>> --------------------------------------------------------------------------------
>> 2018-07-24T12:09:38.854+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
>> UNIX signal handlers for [TERM, HUP, INT]
>> 2018-07-24T12:09:38.895+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
>> StandaloneSessionClusterEntrypoint.
>> 2018-07-24T12:09:38.895+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>> default filesystem.
>> 2018-07-24T12:09:38.927+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
>> security context.
>> 2018-07-24T12:09:39.034+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
>> cluster services.
>> 2018-07-24T12:09:39.059+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
>> start actor system at flink-jobmanager:6123
>> 2018-07-24T12:09:40.335+0000
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
>> started at akka.tcp://flink@flink-jobmanager:6123
>>
>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Alex,
>>>
>>> I'm not entirely sure what causes this problem because it is the first
>>> time I see it.
>>>
>>> First question would be if the problem also arises if using a different
>>> Hadoop version.
>>>
>>> Are you using the same Java versions on the client as well as on the
>>> server?
>>>
>>> Could you provide us with the cluster entrypoint logs?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com>
>>> wrote:
>>>
>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>>> 1.5 and getting a weird exception.
>>>>
>>>> Job reads json from s3a and writes parquet files to s3a with avro
>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have
>>>> access to S3AFileSystem class.
>>>>
>>>> Fails here
>>>>
>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>>>> with
>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>>
>>>> To be exact it fails right on that line.
>>>>
>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>>>
>>>> Not sure how to resolve this problem. Looking for an advice. Let me
>>>> know if more info is needed. Full stack is below. Thanks.
>>>>
>>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>>>> at
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.util.concurrent.CompletionException:
>>>> org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>>>> ... 29 more
>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>> at
>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>> ... 4 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>>> not set up JobManager
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>>>> at
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>>>> ... 21 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>> Cannot initialize task 'DataSink
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>>>> Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>>>> ... 26 more
>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>>> failed: unread block data
>>>> at
>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>>>> ... 31 more
>>>> Caused by: java.lang.IllegalStateException: unread block data
>>>> at
>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>>>> at
>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>>>> ... 32 more
>>>>
>>>>
>

Re: Flink 1.5 batch job fails to start

Posted by vino yang <ya...@gmail.com>.
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To
troubleshoot the exception comes from different Hadoop version. I suggest
you match the both side of Hadoop version.

You can :

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match
Flink Cluster's hadoop dependency's version.

[1]:
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz

Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <al...@gmail.com>:

> Hi Till,
>
> Thanks for responding. Below is entrypoint logs. One thing I noticed that
> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
> it be a reason for that error? If so how can i use same hadoop version 2.8
> on flink server side?  BTW job runs fine locally reading from the same s3a
> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
> --input s3a://foo --output s3a://bar
>
> Regarding java version. The job is submitted via Flink UI, so it should
> not be a problem.
>
> Thanks a lot in advance.
>
> 2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
> 2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   OS current user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Current Hadoop/Kerberos user: flink
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Maximum heap size: 1963 MiBytes
> 2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JAVA_HOME: /docker-java-home/jre
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Hadoop version: 2.7.3
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM Options:
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Xms2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Xmx2048m
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
> disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dcom.amazonaws.sdk.disableCertChecking
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
> address=5015
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
> properties
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-
> console.xml
> 2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Program Arguments:
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --configDir
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      /opt/flink/conf
> 2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --executionMode
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      --host
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO      cluster
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Classpath: /opt/flink/lib/flink-metrics-
> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
> 2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  ------------------------------------------------------------
> --------------------
> 2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Starting StandaloneSessionClusterEntrypoint.
> 2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Install default filesystem.
> 2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Install security context.
> 2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Initializing cluster services.
> 2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Trying to start actor system at flink-jobmanager:6123
> 2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123
>
> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Alex,
>>
>> I'm not entirely sure what causes this problem because it is the first
>> time I see it.
>>
>> First question would be if the problem also arises if using a different
>> Hadoop version.
>>
>> Are you using the same Java versions on the client as well as on the
>> server?
>>
>> Could you provide us with the cluster entrypoint logs?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:
>>
>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink
>>> 1.5 and getting a weird exception.
>>>
>>> Job reads json from s3a and writes parquet files to s3a with avro model.
>>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
>>> S3AFileSystem class.
>>>
>>> Fails here
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-runtime/src/main/java/org/apache/flink/runtime/
>>> operators/util/TaskConfig.java#L288
>>> with
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>>
>>> To be exact it fails right on that line.
>>> https://github.com/apache/flink/blob/release-1.5.0/
>>> flink-core/src/main/java/org/apache/flink/util/
>>> InstantiationUtil.java#L488
>>>
>>> Not sure how to resolve this problem. Looking for an advice. Let me know
>>> if more info is needed. Full stack is below. Thanks.
>>>
>>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>> handleRequest$3(JarRunHandler.java:141)
>>> at java.util.concurrent.CompletableFuture.uniExceptionally(
>>> CompletableFuture.java:870)
>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
>>> CompletableFuture.java:852)
>>> at java.util.concurrent.CompletableFuture.postComplete(
>>> CompletableFuture.java:474)
>>> at java.util.concurrent.CompletableFuture.completeExceptionally(
>>> CompletableFuture.java:1977)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.
>>> onComplete(FutureUtils.java:811)
>>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.
>>> execute(Executors.java:83)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
>>> scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
>>> Promise.scala:252)
>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$
>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
>>> BatchingExecutor.scala:55)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply$mcV$sp(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.
>>> apply(BatchingExecutor.scala:91)
>>> at scala.concurrent.BlockContext$.withBlockContext(
>>> BlockContext.scala:72)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(
>>> BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>>> AbstractDispatcher.scala:415)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>> ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>> Caused by: java.util.concurrent.CompletionException:
>>> org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at java.util.concurrent.CompletableFuture.encodeRelay(
>>> CompletableFuture.java:326)
>>> at java.util.concurrent.CompletableFuture.completeRelay(
>>> CompletableFuture.java:338)
>>> at java.util.concurrent.CompletableFuture.uniRelay(
>>> CompletableFuture.java:911)
>>> at java.util.concurrent.CompletableFuture$UniRelay.
>>> tryFire(CompletableFuture.java:899)
>>> ... 29 more
>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>>> 13a1478cbc7ec20f93f9ee0947856bfd.
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:254)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
>>> AkkaRpcActor.java:247)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
>>> handleRpcMessage(AkkaRpcActor.java:162)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
>>> AkkaRpcActor.java:142)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
>>> onReceive(FencedAkkaRpcActor.java:40)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
>>> UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ... 4 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>>> not set up JobManager
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:169)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher$
>>> DefaultJobManagerRunnerFactory.createJobManagerRunner(
>>> Dispatcher.java:885)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> createJobManagerRunner(Dispatcher.java:287)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
>>> Dispatcher.java:277)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> persistAndRunJob(Dispatcher.java:262)
>>> at org.apache.flink.runtime.dispatcher.Dispatcher.
>>> submitJob(Dispatcher.java:249)
>>> ... 21 more
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Cannot initialize task 'DataSink (org.apache.flink.api.java.
>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the
>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.
>>> HadoopOutputFormat@5ebe8168) failed: unread block data
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:220)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:100)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
>>> JobMaster.java:1150)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.
>>> createAndRestoreExecutionGraph(JobMaster.java:1130)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(
>>> JobMaster.java:298)
>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
>>> init>(JobManagerRunner.java:151)
>>> ... 26 more
>>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>>> failed: unread block data
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:63)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
>>> buildGraph(ExecutionGraphBuilder.java:216)
>>> ... 31 more
>>> Caused by: java.lang.IllegalStateException: unread block data
>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
>>> ObjectInputStream.java:2781)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>>> at java.io.ObjectInputStream.defaultReadFields(
>>> ObjectInputStream.java:2285)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>>> at java.io.ObjectInputStream.readOrdinaryObject(
>>> ObjectInputStream.java:2067)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:488)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:475)
>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>>> InstantiationUtil.java:463)
>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
>>> InstantiationUtil.java:424)
>>> at org.apache.flink.runtime.operators.util.TaskConfig.
>>> getStubWrapper(TaskConfig.java:288)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
>>> initializeOnMaster(OutputFormatVertex.java:60)
>>> ... 32 more
>>>
>>>

Re: Flink 1.5 batch job fails to start

Posted by Alex Vinnik <al...@gmail.com>.
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that
"Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
it be a reason for that error? If so how can i use same hadoop version 2.8
on flink server side?  BTW job runs fine locally reading from the same s3a
buckets when executed using createLocalEnvironment via java -jar my-fat.jar
--input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not
be a problem.

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
user: flink
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
size: 1963 MiBytes
2018-07-24T12:09:38.844+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
/docker-java-home/jre
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.7.3
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
Arguments:
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-24T12:09:38.852+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
default filesystem.
2018-07-24T12:09:38.927+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
security context.
2018-07-24T12:09:39.034+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
cluster services.
2018-07-24T12:09:39.059+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Alex,
>
> I'm not entirely sure what causes this problem because it is the first
> time I see it.
>
> First question would be if the problem also arises if using a different
> Hadoop version.
>
> Are you using the same Java versions on the client as well as on the
> server?
>
> Could you provide us with the cluster entrypoint logs?
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:
>
>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
>> and getting a weird exception.
>>
>> Job reads json from s3a and writes parquet files to s3a with avro model.
>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
>> S3AFileSystem class.
>>
>> Fails here
>>
>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
>> with
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>>
>> To be exact it fails right on that line.
>>
>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>>
>> Not sure how to resolve this problem. Looking for an advice. Let me know
>> if more info is needed. Full stack is below. Thanks.
>>
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
>> at
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>> at
>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>> at
>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>> at
>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>> ... 29 more
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> 13a1478cbc7ec20f93f9ee0947856bfd.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
>> ... 32 more
>>
>>

Re: Flink 1.5 batch job fails to start

Posted by Till Rohrmann <tr...@apache.org>.
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time
I see it.

First question would be if the problem also arises if using a different
Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <al...@gmail.com> wrote:

> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
> and getting a weird exception.
>
> Job reads json from s3a and writes parquet files to s3a with avro model.
> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
> S3AFileSystem class.
>
> Fails here
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
> with
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
>
> To be exact it fails right on that line.
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>
> Not sure how to resolve this problem. Looking for an advice. Let me know
> if more info is needed. Full stack is below. Thanks.
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 29 more
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)':
> Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
> ... 26 more
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
> at
> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
> ... 31 more
> Caused by: java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> at
> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
> ... 32 more
>
>