You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/02/17 17:02:27 UTC
[drill] branch master updated: DRILL-8139: Parquet CodecFactory thread safety bug (#2463)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new ebd027b DRILL-8139: Parquet CodecFactory thread safety bug (#2463)
ebd027b is described below
commit ebd027bfd72f35267d5b15db3307079400383ca6
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Thu Feb 17 19:02:20 2022 +0200
DRILL-8139: Parquet CodecFactory thread safety bug (#2463)
---
.../compression/AirliftBytesInputCompressor.java | 22 +++---
.../compression/DrillCompressionCodecFactory.java | 81 ++++++++++++++++------
pom.xml | 2 +-
3 files changed, 72 insertions(+), 33 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
index 9170e81..1059af4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
@@ -19,7 +19,8 @@ package org.apache.drill.exec.store.parquet.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Stack;
+import java.util.Deque;
+import java.util.LinkedList;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
@@ -59,7 +60,7 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
private ByteBufferAllocator allocator;
// all the direct memory buffers we've allocated, and must release
- private Stack<ByteBuffer> allocatedBuffers;
+ private Deque<ByteBuffer> allocatedBuffers;
public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
this.codecName = codecName;
@@ -86,7 +87,7 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
}
this.allocator = allocator;
- this.allocatedBuffers = new Stack<>();
+ this.allocatedBuffers = new LinkedList<>();
logger.debug(
"constructed a {} using a backing compressor of {}",
@@ -159,13 +160,16 @@ public class AirliftBytesInputCompressor implements CompressionCodecFactory.Byte
@Override
public void release() {
- logger.debug(
- "will release {} allocated buffers.",
- this.allocatedBuffers.size()
- );
+ int bufCount = allocatedBuffers.size();
- while (!this.allocatedBuffers.isEmpty()) {
- this.allocator.release(allocatedBuffers.pop());
+ // LIFO release order to try to reduce memory fragmentation.
+ int i = 0;
+ while (!allocatedBuffers.isEmpty()) {
+ allocator.release(allocatedBuffers.pop());
+ i++;
}
+ assert bufCount == i;
+
+ logger.debug("released {} allocated buffers", bufCount);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
index 53eee64..f971b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
@@ -18,8 +18,10 @@
package org.apache.drill.exec.store.parquet.compression;
import java.util.Arrays;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
@@ -50,17 +52,30 @@ public class DrillCompressionCodecFactory implements CompressionCodecFactory {
// The set of codecs to be handled by aircompressor
private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
- Arrays.asList(CompressionCodecName.LZ4, CompressionCodecName.LZO,
- CompressionCodecName.SNAPPY, CompressionCodecName.ZSTD));
+ Arrays.asList(
+ CompressionCodecName.LZ4,
+ CompressionCodecName.LZO,
+ CompressionCodecName.SNAPPY,
+ CompressionCodecName.ZSTD
+ )
+ );
- // pool of reused aircompressor compressors (parquet-mr's factory has its own)
+ // pool of reusable thread-safe aircompressor compressors (parquet-mr's factory has its own)
private final Map<CompressionCodecName, AirliftBytesInputCompressor> airCompressors = new HashMap<>();
// fallback parquet-mr compression codec factory
- private CompressionCodecFactory parqCodecFactory;
+ // TODO: uncomment once PARQUET-2126 is fixed.
+ // private final CompressionCodecFactory parqCodecFactory;
// direct memory allocator to be used during (de)compression
- private ByteBufferAllocator allocator;
+ private final ByteBufferAllocator allocator;
+
+ // Start: members for working around a CodecFactory concurrency bug c.f. DRILL-8139
+ // TODO: remove once PARQUET-2126 is fixed.
+ private final Deque<CompressionCodecFactory> singleUseFactories;
+ private final Configuration config;
+ private final int pageSize;
+ // End
// static builder method, solely to mimick the parquet-mr API as closely as possible
public static CompressionCodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator,
@@ -69,49 +84,69 @@ public class DrillCompressionCodecFactory implements CompressionCodecFactory {
}
public DrillCompressionCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+ this.config = config;
this.allocator = allocator;
- this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
-
- logger.debug(
- "constructed a {} using a fallback factory of {}",
- getClass().getName(),
- parqCodecFactory.getClass().getName()
- );
+ this.pageSize = pageSize;
+ this.singleUseFactories = new LinkedList<>();
+ // TODO: uncomment once PARQUET-2126 is fixed.
+ // this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
}
@Override
- public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
+ public synchronized BytesInputCompressor getCompressor(CompressionCodecName codecName) {
if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
return airCompressors.computeIfAbsent(
codecName,
c -> new AirliftBytesInputCompressor(codecName, allocator)
);
} else {
- return parqCodecFactory.getCompressor(codecName);
+ // Work around PARQUET-2126: construct a new codec factory every time to
+ // avoid a concurrrency bug c.f. DRILL-8139. Fortunately, constructing
+ // and releasing codec factories appears to be light weight.
+ CompressionCodecFactory ccf = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+ // hold onto a reference for later release()
+ singleUseFactories.add(ccf);
+ return ccf.getCompressor(codecName);
+
+ // TODO: replace the above with the below PARQUET-2126 is fixed
+ // return parqCodecFactory.getDecompressor(codecName);
}
}
@Override
- public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
+ public synchronized BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
return airCompressors.computeIfAbsent(
codecName,
c -> new AirliftBytesInputCompressor(codecName, allocator)
);
} else {
- return parqCodecFactory.getDecompressor(codecName);
+ // Work around PARQUET-2126: construct a new codec factory every time to
+ // avoid a concurrrency bug c.f. DRILL-8139. Fortunately, constructing
+ // and releasing codec factories appears to be light weight.
+ CompressionCodecFactory ccf = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+ // hold onto a reference for later release()
+ singleUseFactories.add(ccf);
+ return ccf.getDecompressor(codecName);
+
+ // TODO: replace the above with the below PARQUET-2126 is fixed
+ // return parqCodecFactory.getDecompressor(codecName);
}
}
@Override
- public void release() {
- parqCodecFactory.release();
- logger.debug("released {}", parqCodecFactory);
+ public synchronized void release() {
+ // TODO: uncomment once PARQUET-2126 is fixed.
+ // parqCodecFactory.release();
+ // logger.debug("released {}", parqCodecFactory);
- for (AirliftBytesInputCompressor abic : airCompressors.values()) {
- abic.release();
- logger.debug("released {}", abic);
- }
+ airCompressors.values().forEach(AirliftBytesInputCompressor::release);
+ logger.debug("released {} aircompressors", airCompressors.size());
airCompressors.clear();
+
+ // TODO: remove once PARQUET-2126 is fixed.
+ singleUseFactories.forEach(CompressionCodecFactory::release);
+ logger.debug("released {} single-use codec factories.", singleUseFactories.size());
+ singleUseFactories.clear();
}
}
diff --git a/pom.xml b/pom.xml
index f752ebf..f2f4fad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@
<shaded.guava.version>28.2-jre</shaded.guava.version>
<guava.version>30.1.1-jre</guava.version>
<forkCount>2</forkCount>
- <parquet.version>1.12.0</parquet.version>
+ <parquet.version>1.12.2</parquet.version>
<parquet.format.version>2.8.0</parquet.format.version>
<!--
For development purposes to be able to use custom Calcite versions (e.g. not present in jitpack