You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2019/07/02 00:13:13 UTC

Random errors reading binary files in batch workflow

Hi all,

My new latest issue is that regularly (but not always) I get a java.io.UTFDataFormatException when trying to read in serialized records.

I can re-run the exact same workflow, on the same cluster, with the same input data, and sometimes it works.

It seems like the higher the parallelism, the more likely that an error happens.

The fact that sometimes it’s OK feels like it’s not a problem with corrupted records (previously written out by an upstream workflow), as that should cause a consistent failure.

The error occurs when reading from both S3 and HDFS.

When the error occurs, it looks like this (fails on deserializing the first field in the POJO):

2019-07-01 22:12:02,542 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (feature vector source) (36/64) (577f1375e15df4a5352a405fb8b21204) switched from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 2
	at java.io.DataInputStream.readUTF(DataInputStream.java:634)
	at java.io.DataInputStream.readUTF(DataInputStream.java:564)
	at com.adbeat.similarity.FeatureVectorWithCountry.read(FeatureVectorWithCountry.java:47)
	at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
	at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
	at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

FeatureVectorWithCountry is a POJO that implements the IOReadableWriteable interface.

It also sometimes fails while reading a different POJO, which is in a different input DataSet in the same workflow:

2019-07-01 00:39:05,829 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at createWorkflow(AdvertiserSimilarityWorkflow.java:88) (org.apache.flink.api.common.io.SerializedInputFormat)) (17/48) (021bc0011dd523a4314d4e52f97a2486) switched from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 50
	at java.io.DataInputStream.readUTF(DataInputStream.java:656)
	at java.io.DataInputStream.readUTF(DataInputStream.java:564)
	at com.adbeat.similarity.advertiser.AdText.read(AdText.java:170)
	at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
	at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
	at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

I don’t see any preceding errors in the logs.

It seems like the calculation of valid starting offsets in a split are sometimes wrong, and thus it starts trying to read a record from an incorrect location.

Has anyone else run into this?

Thanks,

— Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra