You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhu Zhu (Jira)" <ji...@apache.org> on 2019/09/10 04:17:00 UTC

[jira] [Comment Edited] (FLINK-14037) Deserializing the input/output formats failed: unread block data

    [ https://issues.apache.org/jira/browse/FLINK-14037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926317#comment-16926317 ] 

Zhu Zhu edited comment on FLINK-14037 at 9/10/19 4:16 AM:
----------------------------------------------------------

Hi [~liupengcheng], the _flink-hadoop-compatibility_ artifact should be used by your app in a compile scope so it is part of the user code and no need to be in _flink-dis_t.

The root cause of this issue is that some user defined objects failed to be deserialized, which may be caused by some class version mismatch. (I suspect it's hadoop classes.)

Would you try deploying pre-bundled Hadoop jar [e.g.|https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar] to your flink-dist/lib to see if it solves your problem? [Ref|https://flink.apache.org/downloads.html]

 

 


was (Author: zhuzh):
Hi [~liupengcheng], the _flink-hadoop-compatibility_ artifact should be used by your app in a compile scope so it is part of the user code and no need to be in _flink-dis_t.

The root cause of this issue is that some user defined objects failed to be deserialized, which may be caused by some class version mismatch.

Would you try deploying pre-bundled Hadoop jar [e.g.|https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar] to your flink-dist/lib to see if it solves your problem? [Ref|[https://flink.apache.org/downloads.html]]

 

 

> Deserializing the input/output formats failed: unread block data
> ----------------------------------------------------------------
>
>                 Key: FLINK-14037
>                 URL: https://issues.apache.org/jira/browse/FLINK-14037
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>         Environment: flink 1.9.0
> app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts
>  
>            Reporter: liupengcheng
>            Priority: Major
>
> Recently, we encountered the following issue when testing flink 1.9.0:
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b)
> 	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> 	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> 	at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
> 	at com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
> 	at com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> 	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> 	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
> 	at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> 	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> 	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> 	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
> 	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> 	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
> 	at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> 	... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
> 	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
> 	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
> 	at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> 	... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390) (org.apache.flink.api.scala.hadoop.mapreduce.HadoopInpu)': Loading the input/output formats failed: org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
> 	at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> 	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
> 	... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed: org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
> 	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
> 	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
> 	... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: unread block data
> 	at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
> 	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
> 	... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
> 	at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> 	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> 	at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
> 	... 23 more
> End of exception on server side>]
> 	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
> 	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
> 	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> 	... 4 more
> {code}
> I checked the classpath:
> {noformat}
> Classpath: lib/flink-table-blink_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/flink-table_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml: {OTHER HADOOP CLASSPATH}{noformat}
> Our app code:
> {code:java}
>     val env = ExecutionEnvironment.getExecutionEnvironment
> //    env.getConfig.enableObjectReuse()
>     val dataSet = env.createInput(HadoopInputs.readHadoopFile(
>       new TeraInputFormat, classOf[Array[Byte]], classOf[Array[Byte]], inputFile))
>       .partitionCustom(new FlinkTeraSortPartitioner(new TeraSortPartitioner(partitions)), 0)
>       .sortPartition(0, Order.ASCENDING)
>     val job = Job.getInstance(new JobConf)
>     val outputFormat = new HadoopOutputFormat[Array[Byte], Array[Byte]](
>       new TeraOutputFormat, job)
>     FileOutputFormat.setOutputPath(job, new Path(outputFile))
>     dataSet.output(outputFormat)
>     env.execute("TeraSort")
> {code}
> I'm trying to find out the root cause, now I suspect that we are missing the `flink-hadoop-compatibility_2.11` package under the flink-dist directory.
>  
> And what's more, I think we should provide better useful information to users rather than such messages which are hard to understand.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)