You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 12:36:10 UTC

[GitHub] [beam] damccorm opened a new issue, #19644: Read TFRecord Files from hdfs will meet exception if file size is large

damccorm opened a new issue, #19644:
URL: https://github.com/apache/beam/issues/19644

   I read the TFRecord files which is in HDFS will meet error.
    * The single TFRecord file is larger than 3GB.
    * The total size larger than 1TB.
    * Using Beam 2.13.0 **** Flinkrunner 2.13.0 **** Java 1.8, I also test 2.11.0/2.12.0 with same problem
   
   The dependency jar (in build.gradle):
   ```
   
   dependencies
   
   { // This dependency is found on compile classpath of this component and consumers.
   //implementation 'com.google.guava:guava:27.0.1-jre' compile 'org.apache.beam:beam-sdks-java-core:2.13.0'
   compile 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 'org.tensorflow:tensorflow-hadoop:1.13.1'
   compile 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation "org.apache.beam:beam-sdks-java-core:2.13.0"
   compile "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0"
   compile "org.apache.hadoop:hadoop-common:2.7.3" compile "org.apache.hadoop:hadoop-client:2.7.3" compile
   "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile "org.tensorflow:proto:1.13.1" compile
   "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit test framework testImplementation
   'junit:junit:4.12' }
   ```
   
   The error msg:
   
    
   ```
   
   ------------------------------------------------------------
   
   The program finished with the following
   exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an
   error.
   at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
   at
   org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
   at
   org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
   at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
   at
   org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
   at
   org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
   at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
   at
   java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:422)
   at
   org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
   at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at
   org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
   Caused by: java.lang.RuntimeException:
   Pipeline execution failed
   at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
   at
   org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
   at
   avazu.data.transform.App.testTfrecordQIYU(App.java:572)
   at avazu.data.transform.App.main(App.java:744)
   at
   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at
   sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at
   org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
   ... 12 more
   Caused
   by: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
   at
   org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
   at
   org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
   at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
   at
   org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
   at
   org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
   ... 21 more
   Caused by: org.apache.flink.runtime.client.JobExecutionException:
   Job execution failed.
   at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
   at
   org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
   ... 26
   more
   Caused by: java.io.IOException: Mismatch of length mask when reading a record. Expected 808268081
   but received 1769712859.
   at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
   at
   org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
   at
   org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
   at
   org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
   at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
   at
   org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
   at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
   at
   org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
   at
   org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
   at
   org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
   at
   java.lang.Thread.run(Thread.java:745)
   
   ```
   
   How to fix?
    * I have already fix the bug with the following code.
    * I will refine the following code and commit the codes.
   
    
   ```
   
   diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   index
   96a753a..484a7cb 100644
   --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   +++
   b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
   @@ -631,8 +631,16 @@ public class
   TFRecordIO {
   int headerBytes = inChannel.read(header);
   if (headerBytes <= 0) {
   return null;
   + }
   else if (headerBytes != HEADER_LEN) {
   + while (header.hasRemaining() && inChannel.read(header) >= 0)
   {}
   + if (header.hasRemaining()) {
   + throw new IOException(String.format(
   + "EOF while reading record
   of length %d. Read only %d bytes. Input might be truncated. Not a valid TFRecord. Fewer than 12 bytes.",
   +
   HEADER_LEN, header.position()));
   + }
   + } else {
   +
   }
   - checkState(headerBytes == HEADER_LEN, "Not
   a valid TFRecord. Fewer than 12 bytes.");
   
   header.rewind();
   long length = header.getLong();
   @@ -655,7
   +663,12 @@ public class TFRecordIO {
   }
   
   footer.clear();
   - inChannel.read(footer);
   + while (footer.hasRemaining()
   && inChannel.read(footer) >= 0) {}
   + if (footer.hasRemaining()) {
   + throw new IOException(String.format(
   +
   "EOF while reading record of length %d. Read only %d bytes. Input might be truncated. Footer error.",
   +
   FOOTER_LEN, footer.position()));
   + }
   footer.rewind();
   
   int maskedCrc32OfData = footer.getInt();
   
   ```
   
    
   
   Imported from Jira [BEAM-7695](https://issues.apache.org/jira/browse/BEAM-7695). Original Jira may contain additional context.
   Reported by: silenceli.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org