You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2023/02/20 17:41:04 UTC
[spark] branch master updated: [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1688a8768fb [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160
1688a8768fb is described below
commit 1688a8768fb34060548f8790e77f645027f65db2
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Feb 20 09:40:44 2023 -0800
[SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160
### What changes were proposed in this pull request?
SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.
We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.
See more discussions at https://github.com/apache/parquet-mr/pull/982 and https://github.com/apache/iceberg/pull/5681
### Why are the changes needed?
The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.
### Does this PR introduce _any_ user-facing change?
Yes, it's bug fix.
### How was this patch tested?
The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.
```
spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
```
```
spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
```
- before this patch
All executors get killed by NM quickly.
```
ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
```
<img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">
- after this patch
Query runs well, no executor gets killed.
<img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">
Closes #40091 from pan3793/SPARK-41952.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../datasources/parquet/ParquetCodecFactory.java | 112 +++++++++++++++++++++
.../parquet/SpecificParquetRecordReaderBase.java | 2 +
2 files changed, 114 insertions(+)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
new file mode 100644
index 00000000000..2edbdc70da2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * This class implements a codec factory that is used when reading from Parquet. It adds a
+ * workaround for memory issues encountered when reading from zstd-compressed files. For
+ * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a>
+ *
+ * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160.
+ */
+public class ParquetCodecFactory extends CodecFactory {
+
+ public ParquetCodecFactory(Configuration configuration, int pageSize) {
+ super(configuration, pageSize);
+ }
+
+ /**
+ * Copied and modified from CodecFactory.HeapBytesDecompressor
+ */
+ @SuppressWarnings("deprecation")
+ class HeapBytesDecompressor extends BytesDecompressor {
+
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+
+ HeapBytesDecompressor(CompressionCodecName codecName) {
+ this.codec = getCodec(codecName);
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ decompressor = null;
+ }
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ final BytesInput decompressed;
+ if (codec != null) {
+ if (decompressor != null) {
+ decompressor.reset();
+ }
+ InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
+
+ if (codec instanceof ZstandardCodec) {
+ // We need to explicitly close the ZstdDecompressorStream here to release the resources
+ // it holds to avoid off-heap memory fragmentation issue, see PARQUET-2160.
+ // This change will load the decompressor stream into heap a little earlier, since the
+ // problem it solves only happens in the ZSTD codec, so this modification is only made
+ // for ZSTD streams.
+ decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+ is.close();
+ } else {
+ decompressed = BytesInput.from(is, uncompressedSize);
+ }
+ } else {
+ decompressed = bytes;
+ }
+ return decompressed;
+ }
+
+ @Override
+ public void decompress(
+ ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+ throws IOException {
+ ByteBuffer decompressed =
+ decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
+ output.put(decompressed);
+ }
+
+ @Override
+ public void release() {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+ return new HeapBytesDecompressor(codecName);
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 8cefa589c0e..678b287a5e3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -96,6 +96,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
ParquetReadOptions options = HadoopReadOptions
.builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
+ .withCodecFactory(new ParquetCodecFactory(configuration, 0))
.build();
ParquetFileReader fileReader = new ParquetFileReader(
HadoopInputFile.fromPath(file, configuration), options);
@@ -159,6 +160,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
ParquetReadOptions options = HadoopReadOptions
.builder(config, file)
.withRange(0, length)
+ .withCodecFactory(new ParquetCodecFactory(config, 0))
.build();
ParquetFileReader fileReader = ParquetFileReader.open(
HadoopInputFile.fromPath(file, config), options);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org