You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2022/04/21 17:21:14 UTC

[GitHub] [parquet-mr] theosib-amazon opened a new pull request, #959: PARQUET-2126: Make cached (de)compressors thread-safe

theosib-amazon opened a new pull request, #959:
URL: https://github.com/apache/parquet-mr/pull/959

   CodecFactory cached instances of compressors and decompressors across
   threads, which was not thread-safe. This change makes the caches
   thread-local.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1193390004

   @theosib-amazon, I am not concerned if release/close isn't called and I agree the caller must call release/close after finishing. My question is that before release/close is called, there could be short-living threads that are used to create the compressor/decompressor in the cache. Those short-living threads exit and the cache is not aware of that, then that causes the cache grows with a lot of dead compressor/decompressors. In the scenario where short-living threads just come and go as a normal business, this could be a problem. I know normally it is not a problem because in most of the cases we use thread pool but I am just not sure there is a corner case like that. Parquet is a low-level library and is used in so many cases. 
   
    I am sorry if I didn't make my previous comment more obvious.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873237300


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -184,8 +192,18 @@ public CompressionCodecName getCodecName() {
 
   }
 
+  /*
+  Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   */
   @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
+    Thread me = Thread.currentThread();

Review Comment:
   Didn't read the implementation of Thread class, but is it immutable in terms of the hashcode? In other words, if Thread.currentThread() is called twice, would it be possible to get two different hashcode? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127067783

   If we change it to be per thread, then would it be a problem in the scenario where short living threads come and go. When the thread stopped, we might not know and leak here. 
   
   And, please add tests 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] dossett commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
dossett commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1106556192

   Seems good to me (non-binding!).  Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix. 
   
   Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior.  I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though.  I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] jnturton commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
jnturton commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1191048626

   > Are you concerned about leaking if release/close isn't called? I'm pretty sure that would result in leaks. I suppose that might be solvable if we added a finalize() method that called release(). That might solve the problem. Should we do that?
   
   My 2c: finalize() is problematic and deprecated in Java so I don't recommend adding it. The requirement here that the caller must close after they're finished is totally reasonable and to be found in APIs everywhere.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1106632106

   Alright. You have a point. If the maintainers want me to delete that stuff, they can let me know, and I'll go ahead and do it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1129005310

   I added a test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1194158751

   One option is to provide another API call that releases the cached instance for only the current thread. What should we call it? I forget whether close or release is used more, but if everyone is using close, we could repurpose release to apply to only the current thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1195676003

   I just thought of something that makes me nervous about this PR that requires further investigation. Consider the following scenario:
   - Thread A allocates a codec
   - Thread A releases the codec, which puts it into a global pool of codecs
   - Thread B allocates the same kind of codec, which comes from that same pool
   - Thread A allocates that same kind of codec again, but it gets it from the factory's map instead of the pool
   I'm concerned that this could result in the same codec being given to both threads at the same time. The solution would be to remove the codec from the factory's map when release() is called on the codec itself. 
   Note that this problem is not introduced by this PR, since the double pooling existed before. The irony is that the pool is thread-safe, while the factory was not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873888258


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> all_compressors = new ConcurrentHashMap<>();

Review Comment:
   I changed it to camel case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] steveloughran commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1199891512

   ypu might want to look at WeakReferences...we've been using them recently to implement threadlocal-like storage where GCs will trigger cleanup of instances which aren't being used any more
   https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
   https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
   
   the evolution of that code would be to implement the callback the JVM can issue on reference expiry and so do extra cleanup there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127847443

    My question is when a thread exits, we don't have a corresponding evict operation on the map. Using thread pool might be OK if the thread object is not changed, but not sure if there is a scenario where threads are created/exited quickly and we leak in that case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1194259084

   I did some poking around. It looks like if you call release() on a codec, it (a) resets the codec (freeing resources, I think) and (b) returns it to a pool of codecs without actually destroying the codec. 
   
   Later, when release() is called on the factory, it just calls release() again on each of the codecs, returning them to the pool. The only other effect is that references are removed from a container in the factory.
   
   The only question, then, is what happens if release is called twice on a codec. It looks like nothing happens because CodecPool.payback() will return false when the codec is already in the pool. Moreover, I'm pretty sure the original implementation already did this.
   
   So I think the solution it to literally do nothing. The new usage pattern is now:
   - Create Codec factory
   - Create worker threads
   - Threads create codecs
   - Threads finish using codecs
   - Threads *optionally* call release on their codecs if they want to free resources right away.
   - Threads terminate
   - The thread that created the worker threads waits until those threads are done
   - release is called on the factory, cleaning up any codecs that were not released already
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
ggershinsky commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r984394753


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -244,16 +272,60 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
     }
   }
 
+  /**
+   * Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   * This releases all cached instances of all compressors and decompressors created by all threads that share
+   * this CodeFactory instance.
+   * Note: A problem might occur if release() were called while some codec instances were still in use, but it
+   * would not make sense to call close() or release() on a shared CodecFactory while some threads are still
+   * actively using it. The usage pattern should be:

Review Comment:
   It would be also good to have a look at an example of such callers, eg in Apache Spark, to start analyzing the implications.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -244,16 +272,60 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
     }
   }
 
+  /**
+   * Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   * This releases all cached instances of all compressors and decompressors created by all threads that share
+   * this CodeFactory instance.
+   * Note: A problem might occur if release() were called while some codec instances were still in use, but it
+   * would not make sense to call close() or release() on a shared CodecFactory while some threads are still
+   * actively using it. The usage pattern should be:

Review Comment:
   regarding the code parts that call this release() method. For those parts inside the parquet-mr codebase, can  some of them implement/enforce this pattern fully internally? I'd guess most/all of them eventually pass this responsibility to the "app" code above parquet-mr API; so probably this pattern documentation should be moved/copied/referenced in these APIs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127839048

   > If we change it to be per thread, then would it be a problem in the scenario where short living threads come and go? When the thread stopped, we might not know and leak here.
   > 
   > And, please add tests
   
   This is why I use the concurrent hash map, indexed by the thread. Short lived threads are not a problem in that case.
   
   I can't think of how I would go about testing this. Do you have any ideas? I'll have a look to see if there exist any tests already and see if I can add something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1189198163

   @theosib-amazon Do you still have time for addressing the feedback? I think we are very close to merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1190765848

   > @theosib-amazon Do you still have time for addressing the feedback? I think we are very close to merge.
   
   I'm not really sure which feedback to address. Are you concerned about leaking if release/close isn't called? I'm pretty sure that would result in leaks. I suppose that might be solvable if we added a finalize() method that called release(). That might solve the problem. Should we do that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873234812


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> all_compressors = new ConcurrentHashMap<>();

Review Comment:
   In Java, we don't use '_'.  I think just call compressors should be fine 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873884939


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -184,8 +192,18 @@ public CompressionCodecName getCodecName() {
 
   }
 
+  /*
+  Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   */
   @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
+    Thread me = Thread.currentThread();

Review Comment:
   A thread object is created once and exist until the thread dies. Thread does not override hashcode, so it falls back to the implementation in Object, which returns a fixed object ID.
   
   I did consider using ThreadLocal, but then it would not be possible for release() to clean up all of the (de)compressors from defunct threads.
   
   The way I did it appears to be the recommended solution, since that's what I find when I google this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] dossett commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
dossett commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1106556193

   Seems good to me (non-binding!).  Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix. 
   
   Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior.  I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though.  I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r873237300


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -184,8 +192,18 @@ public CompressionCodecName getCodecName() {
 
   }
 
+  /*
+  Modified for https://issues.apache.org/jira/browse/PARQUET-2126
+   */
   @Override
   public BytesCompressor getCompressor(CompressionCodecName codecName) {
+    Thread me = Thread.currentThread();

Review Comment:
   Didn't read the implementation of Thread class, but is it immutable in terms of the hashcode? In other words, if Thread.currentThread() is called twice, would it be possible to get two different hashcode? Consider to use ThreadLocal, not sure the synchronization would be a problem or not.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] theosib-amazon commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
theosib-amazon commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1127885617

   > My question is when a thread exits, we don't have a corresponding evict operation on the map. Using thread pool might be OK if the thread object is not changed, but not sure if there is a scenario where threads are created/exited quickly and we leak in that case.
   
   No matter what thread release() is called from, it will clean up all (de)compressors from all threads. I designed it specifically this way so that a leak won't happen. As long as close/release is called when it should be.
   
   Note that it's not appropriate to call close or release while (de)compression is still going on. If someone does that, it might still work, but it would be a protocol violation. The usage pattern should be:
   
   - Create Codec factory
   - Create worker threads
   - Threads create codecs
   - Threads finish using codecs
   - Threads terminate
   - The thread that created the worker threads waits until those threads are done
   - close/release is called.
   
   Someone might do something different, but that would be a bug no different from someone closing a file in one thread while it's being written to in another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] shangxinli commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
shangxinli commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1151340364

   Thanks for addressing the feedback! What I meant was that ideally when 'Threads terminate' happens, it should clean up the compressor/decompressor immediately. I understand we won't leak in the end of 'close/release is called' though. 
   
   > Seems good to me (non-binding!). Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix.
   > 
   > Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior. I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though. I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.
   
   @dossett, we don't have a standard like that. It seems OK to have. What do you think? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] dossett commented on pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
dossett commented on PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#issuecomment-1151467318

   @shangxinli I do not feel strongly about it.  I think historical context is better kept in JIRAs and PR discussion than in code comments, but that is just a style choice if there's no standard.  (I appreciate you following up, btw!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #959: PARQUET-2126: Make cached (de)compressors thread-safe

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, so getCompressor does not suffer from any thread-safety issue.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory {
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> allCompressors = new ConcurrentHashMap<>();
+  private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>> allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread exits. What is worse, the map will get explosion if there are tons of threads in a long running instance. The `getDecompressor` gets called only on the column chunk basis. So I expect the frequency of the call would not be very high. Would a single `ConcurrentHashMap<CompressionCodecName, BytesDecompressor>()` be sufficient? If we really care about the regression of introducing ConcurrentHashMap when we are sure no concurrency will happen, we can add a new thread-unsafe CodecFactory implementation to do the job. Any thoughts? @theosib-amazon @shangxinli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org