You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ZiyueGuan (Jira)" <ji...@apache.org> on 2021/11/27 08:34:00 UTC
[jira] [Created] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption
ZiyueGuan created HUDI-2875:
-------------------------------
Summary: Concurrent call to HoodieMergeHandler cause parquet corruption
Key: HUDI-2875
URL: https://issues.apache.org/jira/browse/HUDI-2875
Project: Apache Hudi
Issue Type: Bug
Components: Common Core
Reporter: ZiyueGuan
Problem:
Some corrupted parquet files are generated and exceptions will be thrown when read.
e.g.
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file <FilePath>
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col required binary col
at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
How to reproduce:
We need a way which could interrupt one task w/o shutdown JVM. Let's say, speculation. When speculation is triggered, other tasks at the same time or later will have the risk to suffer a wrong parquet generation. Nearly half of them will throw exception while there is few tasks succeed without any signal.
RootCause:
ParquetWriter is not thread safe. User of it should apply proper way to guarantee that there is not concurrent call to ParquetWriter. In the following code:
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103
We call both write and close to parquet writer concurrently. There is a pool of Compressor which is used inside parquet writer for store compressed bytes. Parquet writers closed in such way, could not payback totally reset compressor so that any task reuse this dirty compressor may generate wrong data.
What a pity is that I haven't come up with a good way to repo in small use case. Validation is doing in real hudi ingestion job.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)