You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Artem Shnayder (Jira)" <ji...@apache.org> on 2020/10/07 19:50:00 UTC

[jira] [Created] (PARQUET-1919) Buffer int overflow in CapacityByteArrayOutputStream, SnappyCompressor

Artem Shnayder created PARQUET-1919:
---------------------------------------

             Summary: Buffer int overflow in CapacityByteArrayOutputStream, SnappyCompressor
                 Key: PARQUET-1919
                 URL: https://issues.apache.org/jira/browse/PARQUET-1919
             Project: Parquet
          Issue Type: Bug
          Components: parquet-mr
    Affects Versions: 1.10.1
            Reporter: Artem Shnayder


During an attempted write operation, a buffer position integer overflow is resulting in a IllegalArgumentException: Negative capacity: -2147336621 exception.

 
{noformat}
20/10/06 15:30:39 INFO HadoopRDD: Input split: s3a://<prefix>/part-00015-96362e5d-d047-4f31-812b-38ff79f6919c-c000.txt.bz2:268435456+33554432
20/10/06 17:23:37 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Negative capacity: -2147336621
	at java.nio.Buffer.<init>(Buffer.java:199)
	at java.nio.ByteBuffer.<init>(ByteBuffer.java:281)
	at java.nio.ByteBuffer.<init>(ByteBuffer.java:289)
	at java.nio.MappedByteBuffer.<init>(MappedByteBuffer.java:89)
	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:119)
	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
	at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
	at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
	at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
	at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
	at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
	at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
	at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
	at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
	at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748){noformat}
2,147,483,647 (max int) - 2147336621 (negative capacity) = 147026.

The input bz2 files are all roughly 900 MiB in size. The target parquet part files are 1.7 GiB in size.

Increasing the partition count from 64 to 1024 fixes the issue. The output parquet part files drop to 100MiB in size. 

However, it's unclear to me what the root cause is and why increasing partition count helps. Was it an unlucky row grouping that bumped the buffer size over by 147KB, i.e, any change up or down in parittion count would have helped? Is it approaching the parquet part file size limit?

This issue seems related to PARQUET-1632 but it's not using the ConcatenatingByteArrayCollector, which potentially means a distinct root cause. The input dataset does have large string columns (up to 10MB) but nothing close to the signed int max of 2.4G that was produced in PARQUET-1632.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)