You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/02/17 17:02:27 UTC

[drill] branch master updated: DRILL-8139: Parquet CodecFactory thread safety bug (#2463)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new ebd027b  DRILL-8139: Parquet CodecFactory thread safety bug  (#2463)
ebd027b is described below

commit ebd027bfd72f35267d5b15db3307079400383ca6
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Thu Feb 17 19:02:20 2022 +0200

    DRILL-8139: Parquet CodecFactory thread safety bug  (#2463)
---
 .../compression/AirliftBytesInputCompressor.java   | 22 +++---
 .../compression/DrillCompressionCodecFactory.java  | 81 ++++++++++++++++------
 pom.xml                                            |  2 +-
 3 files changed, 72 insertions(+), 33 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
index 9170e81..1059af4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
@@ -19,7 +19,8 @@ package org.apache.drill.exec.store.parquet.compression;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Stack;
+import java.util.Deque;
+import java.util.LinkedList;
 
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
@@ -59,7 +60,7 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
   private ByteBufferAllocator allocator;
 
   // all the direct memory buffers we've allocated, and must release
-  private Stack<ByteBuffer> allocatedBuffers;
+  private Deque<ByteBuffer> allocatedBuffers;
 
   public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
     this.codecName = codecName;
@@ -86,7 +87,7 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
     }
 
     this.allocator = allocator;
-    this.allocatedBuffers = new Stack<>();
+    this.allocatedBuffers = new LinkedList<>();
 
     logger.debug(
         "constructed a {} using a backing compressor of {}",
@@ -159,13 +160,16 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
 
   @Override
   public void release() {
-    logger.debug(
-        "will release {} allocated buffers.",
-        this.allocatedBuffers.size()
-    );
+    int bufCount  = allocatedBuffers.size();
 
-    while (!this.allocatedBuffers.isEmpty()) {
-      this.allocator.release(allocatedBuffers.pop());
+    // LIFO release order to try to reduce memory fragmentation.
+    int i = 0;
+    while (!allocatedBuffers.isEmpty()) {
+      allocator.release(allocatedBuffers.pop());
+      i++;
     }
+    assert bufCount == i;
+
+    logger.debug("released {} allocated buffers", bufCount);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
index 53eee64..f971b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.parquet.compression;
 
 import java.util.Arrays;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 
@@ -50,17 +52,30 @@ public class DrillCompressionCodecFactory implements CompressionCodecFactory {
 
   // The set of codecs to be handled by aircompressor
   private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
-      Arrays.asList(CompressionCodecName.LZ4, CompressionCodecName.LZO,
-          CompressionCodecName.SNAPPY, CompressionCodecName.ZSTD));
+      Arrays.asList(
+        CompressionCodecName.LZ4,
+        CompressionCodecName.LZO,
+        CompressionCodecName.SNAPPY,
+        CompressionCodecName.ZSTD
+      )
+  );
 
-  // pool of reused aircompressor compressors (parquet-mr's factory has its own)
+  // pool of reusable thread-safe aircompressor compressors (parquet-mr's factory has its own)
   private final Map<CompressionCodecName, AirliftBytesInputCompressor> airCompressors = new HashMap<>();
 
   // fallback parquet-mr compression codec factory
-  private CompressionCodecFactory parqCodecFactory;
+  // TODO: uncomment once PARQUET-2126 is fixed.
+  // private final CompressionCodecFactory parqCodecFactory;
 
   // direct memory allocator to be used during (de)compression
-  private ByteBufferAllocator allocator;
+  private final ByteBufferAllocator allocator;
+
+  // Start: members for working around a CodecFactory concurrency bug c.f. DRILL-8139
+  // TODO: remove once PARQUET-2126 is fixed.
+  private final Deque<CompressionCodecFactory> singleUseFactories;
+  private final Configuration config;
+  private final int pageSize;
+  // End
 
   // static builder method, solely to mimick the parquet-mr API as closely as possible
   public static CompressionCodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator,
@@ -69,49 +84,69 @@ public class DrillCompressionCodecFactory implements CompressionCodecFactory {
   }
 
   public DrillCompressionCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+    this.config = config;
     this.allocator = allocator;
-    this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
-
-    logger.debug(
-        "constructed a {} using a fallback factory of {}",
-        getClass().getName(),
-        parqCodecFactory.getClass().getName()
-    );
+    this.pageSize = pageSize;
+    this.singleUseFactories = new LinkedList<>();
+    // TODO: uncomment once PARQUET-2126 is fixed.
+    // this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
   }
 
   @Override
-  public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
+  public synchronized BytesInputCompressor getCompressor(CompressionCodecName codecName) {
     if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
       return airCompressors.computeIfAbsent(
           codecName,
           c -> new AirliftBytesInputCompressor(codecName, allocator)
       );
     } else {
-      return parqCodecFactory.getCompressor(codecName);
+      // Work around PARQUET-2126: construct a new codec factory every time to
+      // avoid a concurrrency bug c.f. DRILL-8139.  Fortunately, constructing
+      // and releasing codec factories appears to be light weight.
+      CompressionCodecFactory ccf = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+      // hold onto a reference for later release()
+      singleUseFactories.add(ccf);
+      return ccf.getCompressor(codecName);
+
+      // TODO: replace the above with the below PARQUET-2126 is fixed
+      // return parqCodecFactory.getDecompressor(codecName);
     }
   }
 
   @Override
-  public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
+  public synchronized BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
     if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
       return airCompressors.computeIfAbsent(
           codecName,
           c -> new AirliftBytesInputCompressor(codecName, allocator)
       );
     } else {
-      return parqCodecFactory.getDecompressor(codecName);
+      // Work around PARQUET-2126: construct a new codec factory every time to
+      // avoid a concurrrency bug c.f. DRILL-8139.  Fortunately, constructing
+      // and releasing codec factories appears to be light weight.
+      CompressionCodecFactory ccf = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+      // hold onto a reference for later release()
+      singleUseFactories.add(ccf);
+      return ccf.getDecompressor(codecName);
+
+      // TODO: replace the above with the below PARQUET-2126 is fixed
+      // return parqCodecFactory.getDecompressor(codecName);
     }
   }
 
   @Override
-  public void release() {
-    parqCodecFactory.release();
-    logger.debug("released {}", parqCodecFactory);
+  public synchronized void release() {
+    // TODO: uncomment once PARQUET-2126 is fixed.
+    // parqCodecFactory.release();
+    // logger.debug("released {}", parqCodecFactory);
 
-    for (AirliftBytesInputCompressor abic : airCompressors.values()) {
-      abic.release();
-      logger.debug("released {}", abic);
-    }
+    airCompressors.values().forEach(AirliftBytesInputCompressor::release);
+    logger.debug("released {} aircompressors", airCompressors.size());
     airCompressors.clear();
+
+    // TODO: remove once PARQUET-2126 is fixed.
+    singleUseFactories.forEach(CompressionCodecFactory::release);
+    logger.debug("released {} single-use codec factories.", singleUseFactories.size());
+    singleUseFactories.clear();
   }
 }
diff --git a/pom.xml b/pom.xml
index f752ebf..f2f4fad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@
     <shaded.guava.version>28.2-jre</shaded.guava.version>
     <guava.version>30.1.1-jre</guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.12.0</parquet.version>
+    <parquet.version>1.12.2</parquet.version>
     <parquet.format.version>2.8.0</parquet.format.version>
     <!--
       For development purposes to be able to use custom Calcite versions (e.g. not present in jitpack