You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/08 09:19:47 UTC

[GitHub] [beam] baeminbo commented on a diff in pull request #17201: [BEAM-14187] Fix concurrency issue in IsmReaderImpl

baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845915573


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
     position(rawChannel, footer.getBloomFilterPosition());
     bloomFilter = ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
 
-    indexPerShard = new HashMap<>();
+    indexPerShard = new ConcurrentHashMap<>();

Review Comment:
   `indexPerShard` is updated at [initializeForKeyedRead](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562), which is not synchronized. 
   
   #### IllegalStateException
   Let's assume that thread T1 and T2 calls `overKeyComponents` and enter `initializeForKeyedRead` for shardId `K` at the same time. And `indexPerShard` doesn't have `K`:
   
   1. T1 checks [`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) and it returns `false`. 
   2. T2 checks [`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) and it returns `fase`.
   3. T1 advances quickly and invokes [`indexPerShard.put(K, ...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562). Now, `indexPerShard` has `K`.
   2. T2 checks [`checkState(indexPerShard.get(K) == null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503). It throws `IllegalStateException`.
   
   #### NullPointerException
   Let's assume that thread T1 and T2 calls `overKeyComponents`. T1 is for shardId `K1` while T2 is shardId `K2`, and `indexPerShard` has K1:
   
   1. T1 returns from `intializeForKeyedRead` as [`indexPerShard.containsKey(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) is `true`.
   2. T1 stops at [`indexPerShard.get(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210)  at `overKeyComponents`.
   3. T2 advances and invokes [`indexPerShard.put(K2, ...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562) at `intializeForKeyedRead`.
   4. T1 can get `null` from `indexPerShard.get(K1)`, and will throw `NullPointerException` as it tries to invoke [`floorKey`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210).
   
   My theory on this is that 
   * T2 is in [`HashMap.rehash()`](https://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/Hashtable.java#l389) where a `newMap` is created and then entries in `oldMap` are copied to `newMap`.
   * T1 accesses `newMap` before copy finishes. So, it sees `null` for `K1` which exists in `oldMap` but not in `newMap`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org