You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/31 14:11:06 UTC

[GitHub] [flink] dawidwys commented on pull request #19219: [FLINK-26835][serialization] Fix concurrent modification exception

dawidwys commented on pull request #19219:
URL: https://github.com/apache/flink/pull/19219#issuecomment-1084641245


   I second @pnowojski opinion that serializers should be considered not thread safe and used accordingly. We should aim that if we pass a serializer to another thread it should always be a duplicated version of the serializer and once we pass it, we should cede ownership to that thread.
   
   In this particular case I believe the problem is in `org.apache.flink.runtime.operators.BatchTask#initInputLocalStrategy:1010`. We pass a serializer to the `ExternalSorter` which spawns additional thread for reading/sorting... At the same time the implementation of `org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory#getSerializer` does not call `duplicate` the first time it is called. Thus we pass the original serializer to a new thread of the `ExternalSorter`. Later on in the `BatchTask#run` we call `Driver#prepare` which calls `duplicate()` which causes the `ConcurrentModificationException`.
   
   I'd suggest to fix the issue in `BatchTask` by calling the `duplicate()` explicitly:
   ```
       private void initInputLocalStrategy(int inputNum) throws Exception {
   ....
                   case SORT:
                       @SuppressWarnings({"rawtypes", "unchecked"})
                       Sorter<?> sorter =
                               ExternalSorter.newBuilder(
                                               getMemoryManager(),
                                               this,
                                               // we must duplicate the serializer as it will be used in a reading thread of the sorter
                                               this.inputSerializers[inputNum].getSerializer().duplicate(),
                                               getLocalStrategyComparator(inputNum))
   ....
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org