You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2022/11/10 07:42:54 UTC

[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, so getCompressor does not suffer from any thread-safety issue.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();
+  private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>> allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread exits. What is worse, the map will get explosion if there are tons of threads in a long running instance. The `getDecompressor` gets called only on the column chunk basis. So I expect the frequency of the call would not be very high. Would a single `ConcurrentHashMap<CompressionCodecName, BytesDecompressor>()` be sufficient? If we really care about the regression of introducing ConcurrentHashMap when we are sure no concurrency will happen, we can add a new thread-unsafe CodecFactory implementation to do the job. Any thoughts? @theosib-amazon @shangxinli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org