You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/11/10 07:43:00 UTC

[jira] [Commented] (PARQUET-2126) Thread safety bug in CodecFactory

    [ https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631496#comment-17631496 ] 

ASF GitHub Bot commented on PARQUET-2126:
-----------------------------------------

wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, so getCompressor does not suffer from any thread-safety issue.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();
+  private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>> allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread exits. What is worse, the map will get explosion if there are tons of threads in a long running instance. The `getDecompressor` gets called only on the column chunk basis. So I expect the frequency of the call would not be very high. Would a single `ConcurrentHashMap<CompressionCodecName, BytesDecompressor>()` be sufficient? If we really care about the regression of introducing ConcurrentHashMap when we are sure no concurrency will happen, we can add a new thread-unsafe CodecFactory implementation to do the job. Any thoughts? @theosib-amazon @shangxinli 





> Thread safety bug in CodecFactory
> ---------------------------------
>
>                 Key: PARQUET-2126
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2126
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.12.2
>            Reporter: James Turton
>            Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths to achieve thread safety, including keeping Codec objects in an Apache Commons pool that has thread-safe borrow semantics.  This is all undone by the BytesCompressor and BytesDecompressor Maps in org.apache.parquet.hadoop.CodecFactory which end up caching single compressor and decompressor instances due to code in CodecFactory@getCompressor and CodecFactory@getDecompressor.  When the caller runs multiple threads, those threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is selected for the CompressionCodecName.GZIP case, serious problems ensue.  That class is not thread safe and sharing one instance of it between threads produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)