You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 12:36:10 UTC
[GitHub] [beam] damccorm opened a new issue, #19644: Read TFRecord Files from hdfs will meet exception if file size is large
damccorm opened a new issue, #19644:
URL: https://github.com/apache/beam/issues/19644
I read the TFRecord files which is in HDFS will meet error.
* The single TFRecord file is larger than 3GB.
* The total size larger than 1TB.
* Using Beam 2.13.0 **** Flinkrunner 2.13.0 **** Java 1.8, I also test 2.11.0/2.12.0 with same problem
The dependency jar (in build.gradle):
```
dependencies
{ // This dependency is found on compile classpath of this component and consumers.
//implementation 'com.google.guava:guava:27.0.1-jre' compile 'org.apache.beam:beam-sdks-java-core:2.13.0'
compile 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 'org.tensorflow:tensorflow-hadoop:1.13.1'
compile 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation "org.apache.beam:beam-sdks-java-core:2.13.0"
compile "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0"
compile "org.apache.hadoop:hadoop-common:2.7.3" compile "org.apache.hadoop:hadoop-client:2.7.3" compile
"org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile "org.tensorflow:proto:1.13.1" compile
"org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit test framework testImplementation
'junit:junit:4.12' }
```
The error msg:
```
------------------------------------------------------------
The program finished with the following
exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an
error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.RuntimeException:
Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at
avazu.data.transform.App.testTfrecordQIYU(App.java:572)
at avazu.data.transform.App.main(App.java:744)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 12 more
Caused
by: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 26
more
Caused by: java.io.IOException: Mismatch of length mask when reading a record. Expected 808268081
but received 1769712859.
at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651)
at
org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530)
at
org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431)
at
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
at
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at
java.lang.Thread.run(Thread.java:745)
```
How to fix?
* I have already fix the bug with the following code.
* I will refine the following code and commit the codes.
```
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index
96a753a..484a7cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -631,8 +631,16 @@ public class
TFRecordIO {
int headerBytes = inChannel.read(header);
if (headerBytes <= 0) {
return null;
+ }
else if (headerBytes != HEADER_LEN) {
+ while (header.hasRemaining() && inChannel.read(header) >= 0)
{}
+ if (header.hasRemaining()) {
+ throw new IOException(String.format(
+ "EOF while reading record
of length %d. Read only %d bytes. Input might be truncated. Not a valid TFRecord. Fewer than 12 bytes.",
+
HEADER_LEN, header.position()));
+ }
+ } else {
+
}
- checkState(headerBytes == HEADER_LEN, "Not
a valid TFRecord. Fewer than 12 bytes.");
header.rewind();
long length = header.getLong();
@@ -655,7
+663,12 @@ public class TFRecordIO {
}
footer.clear();
- inChannel.read(footer);
+ while (footer.hasRemaining()
&& inChannel.read(footer) >= 0) {}
+ if (footer.hasRemaining()) {
+ throw new IOException(String.format(
+
"EOF while reading record of length %d. Read only %d bytes. Input might be truncated. Footer error.",
+
FOOTER_LEN, footer.position()));
+ }
footer.rewind();
int maskedCrc32OfData = footer.getInt();
```
Imported from Jira [BEAM-7695](https://issues.apache.org/jira/browse/BEAM-7695). Original Jira may contain additional context.
Reported by: silenceli.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org