You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2023/02/20 17:45:12 UTC

[spark] branch branch-3.4 updated: [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 4560d4c4f75 [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160
4560d4c4f75 is described below

commit 4560d4c4f758abb87631195f60d9c293e3d5b2a1
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Mon Feb 20 09:40:44 2023 -0800

    [SPARK-41952][SQL] Fix Parquet zstd off-heap memory leak as a workaround for PARQUET-2160
    
    ### What changes were proposed in this pull request?
    
    SPARK-41952 was raised for a while, but unfortunately, the Parquet community does not publish the patched version yet, as a workaround, we can fix the issue on the Spark side first.
    
    We encountered this memory issue when migrating data from parquet/snappy to parquet/zstd, Spark executors always occupy unreasonable off-heap memory and have a high risk of being killed by NM.
    
    See more discussions at https://github.com/apache/parquet-mr/pull/982 and https://github.com/apache/iceberg/pull/5681
    
    ### Why are the changes needed?
    
    The issue is fixed in the parquet community [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160), but the patched version is not available yet.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it's bug fix.
    
    ### How was this patch tested?
    
    The existing UT should cover the correctness check, I also verified this patch by scanning a large parquet/zstd table.
    
    ```
    spark-shell --executor-cores 4 --executor-memory 6g --conf spark.executor.memoryOverhead=2g
    ```
    
    ```
    spark.sql("select sum(hash(*)) from parquet_zstd_table ").show(false)
    ```
    
    - before this patch
    
    All executors get killed by NM quickly.
    ```
    ERROR YarnScheduler: Lost executor 1 on hadoop-xxxx.****.org: Container killed by YARN for exceeding physical memory limits. 8.2 GB of 8 GB physical memory used. Consider boosting spark.executor.memoryOverhead.
    ```
    <img width="1872" alt="image" src="https://user-images.githubusercontent.com/26535726/220031678-e9060244-5586-4f0c-8fe7-55bb4e20a580.png">
    
    - after this patch
    
    Query runs well, no executor gets killed.
    
    <img width="1881" alt="image" src="https://user-images.githubusercontent.com/26535726/220031917-4fe38c07-b38f-49c6-a982-2091a6c2a8ed.png">
    
    Closes #40091 from pan3793/SPARK-41952.
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../datasources/parquet/ParquetCodecFactory.java   | 112 +++++++++++++++++++++
 .../parquet/SpecificParquetRecordReaderBase.java   |   2 +
 2 files changed, 114 insertions(+)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
new file mode 100644
index 00000000000..2edbdc70da2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCodecFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * This class implements a codec factory that is used when reading from Parquet. It adds a
+ * workaround for memory issues encountered when reading from zstd-compressed files. For
+ * details, see <a href="https://issues.apache.org/jira/browse/PARQUET-2160">PARQUET-2160</a>
+ *
+ * TODO: Remove this workaround after upgrading Parquet which include PARQUET-2160.
+ */
+public class ParquetCodecFactory extends CodecFactory {
+
+  public ParquetCodecFactory(Configuration configuration, int pageSize) {
+    super(configuration, pageSize);
+  }
+
+  /**
+   * Copied and modified from CodecFactory.HeapBytesDecompressor
+   */
+  @SuppressWarnings("deprecation")
+  class HeapBytesDecompressor extends BytesDecompressor {
+
+    private final CompressionCodec codec;
+    private final Decompressor decompressor;
+
+    HeapBytesDecompressor(CompressionCodecName codecName) {
+      this.codec = getCodec(codecName);
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+      } else {
+        decompressor = null;
+      }
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      final BytesInput decompressed;
+      if (codec != null) {
+        if (decompressor != null) {
+          decompressor.reset();
+        }
+        InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
+
+        if (codec instanceof ZstandardCodec) {
+          // We need to explicitly close the ZstdDecompressorStream here to release the resources
+          // it holds to avoid off-heap memory fragmentation issue, see PARQUET-2160.
+          // This change will load the decompressor stream into heap a little earlier, since the
+          // problem it solves only happens in the ZSTD codec, so this modification is only made
+          // for ZSTD streams.
+          decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+          is.close();
+        } else {
+          decompressed = BytesInput.from(is, uncompressedSize);
+        }
+      } else {
+        decompressed = bytes;
+      }
+      return decompressed;
+    }
+
+    @Override
+    public void decompress(
+        ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+      ByteBuffer decompressed =
+          decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
+      output.put(decompressed);
+    }
+
+    @Override
+    public void release() {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+    return new HeapBytesDecompressor(codecName);
+  }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 8cefa589c0e..678b287a5e3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -96,6 +96,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     ParquetReadOptions options = HadoopReadOptions
       .builder(configuration, file)
       .withRange(split.getStart(), split.getStart() + split.getLength())
+      .withCodecFactory(new ParquetCodecFactory(configuration, 0))
       .build();
     ParquetFileReader fileReader = new ParquetFileReader(
         HadoopInputFile.fromPath(file, configuration), options);
@@ -159,6 +160,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     ParquetReadOptions options = HadoopReadOptions
       .builder(config, file)
       .withRange(0, length)
+      .withCodecFactory(new ParquetCodecFactory(config, 0))
       .build();
     ParquetFileReader fileReader = ParquetFileReader.open(
       HadoopInputFile.fromPath(file, config), options);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org