You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/01 01:16:17 UTC

[GitHub] [beam] lukecwik opened a new pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes.

lukecwik opened a new pull request #17240:
URL: https://github.com/apache/beam/pull/17240


   The issue was that the InboundObserver can be invoked before outboundObserverFactory#outboundObserverFor returns meaning that
   the server is waiting for a response for cache.remove but cache.computeIfAbsent is being invoked at the same time.
   
   Another issue was that the outstandingRequests map could be updated with another request within GrpcStateClient during closeAndCleanup meaning that the CompleteableFuture would never be completed exceptionally.
   
   Ran unit test locally 1000 times and now never got stuck.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damccorm commented on a change in pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed.

Posted by GitBox <gi...@apache.org>.
damccorm commented on a change in pull request #17240:
URL: https://github.com/apache/beam/pull/17240#discussion_r840539459



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -59,74 +58,108 @@ public BeamFnStateGrpcClientCache(
     // This showed a 1-2% improvement in the ProcessBundleBenchmark#testState* benchmarks.
     this.channelFactory = channelFactory.withDirectExecutor();
     this.outboundObserverFactory = outboundObserverFactory;
-    this.cache = new ConcurrentHashMap<>();
+    this.cache = new HashMap<>();
   }
 
   /**
    * Creates or returns an existing {@link BeamFnStateClient} depending on whether the passed in
    * {@link ApiServiceDescriptor} currently has a {@link BeamFnStateClient} bound to the same
    * channel.
    */
-  public BeamFnStateClient forApiServiceDescriptor(ApiServiceDescriptor apiServiceDescriptor)
-      throws IOException {
-    return cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
-  }
-
-  private BeamFnStateClient createBeamFnStateClient(ApiServiceDescriptor apiServiceDescriptor) {
-    return new GrpcStateClient(apiServiceDescriptor);
+  public synchronized BeamFnStateClient forApiServiceDescriptor(
+      ApiServiceDescriptor apiServiceDescriptor) throws IOException {
+    // We specifically are synchronized so that we only create one GrpcStateClient at a time
+    // preventing a race where multiple GrpcStateClient objects might be constructed at the same
+    // for the same ApiServiceDescriptor.
+    BeamFnStateClient rval;
+    synchronized (cache) {
+      rval = cache.get(apiServiceDescriptor);
+    }
+    if (rval == null) {
+      // We can't be synchronized on cache while constructing the GrpcStateClient since if the
+      // connection fails, onError may be invoked from the gRPC thread which will invoke
+      // closeAndCleanUp that clears the cache.
+      rval = new GrpcStateClient(apiServiceDescriptor);
+      synchronized (cache) {
+        cache.put(apiServiceDescriptor, rval);

Review comment:
       I'm not super familiar with the context here, but does this open us up to another race condition? Specifically, could you run into a case where:
   
   1. The cache has no client
   2. Thread 1 checks if the cache has a client (it doesn't)
   3. Thread 2 checks if the cache has a client (it doesn't)
   4. Thread 1 creates a new client and puts it in the cache.
   5. Thread 2 creates a new client and puts it in the cache (overwriting what thread 1 put there)
   6. Threads 1 and 2 have different clients
   
   I think if this is a problem we could get around it by doing an extra check in the synchronized block to see if the cache contains a client (and if it does, we can just dispose of the one we just created).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #17240:
URL: https://github.com/apache/beam/pull/17240#issuecomment-1086171578


   > This generally looks good to me in terms of providing good thread safety, I'd appreciate another review for someone who has a little more context on the change though (did you mean to tag @youngoli maybe?)
   
   R: @youngoli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #17240:
URL: https://github.com/apache/beam/pull/17240#issuecomment-1085295399


   R: @damccorm 
   CC: @kileys @TheNeuralBit 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #17240:
URL: https://github.com/apache/beam/pull/17240#discussion_r840792682



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -59,74 +58,108 @@ public BeamFnStateGrpcClientCache(
     // This showed a 1-2% improvement in the ProcessBundleBenchmark#testState* benchmarks.
     this.channelFactory = channelFactory.withDirectExecutor();
     this.outboundObserverFactory = outboundObserverFactory;
-    this.cache = new ConcurrentHashMap<>();
+    this.cache = new HashMap<>();
   }
 
   /**
    * Creates or returns an existing {@link BeamFnStateClient} depending on whether the passed in
    * {@link ApiServiceDescriptor} currently has a {@link BeamFnStateClient} bound to the same
    * channel.
    */
-  public BeamFnStateClient forApiServiceDescriptor(ApiServiceDescriptor apiServiceDescriptor)
-      throws IOException {
-    return cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
-  }
-
-  private BeamFnStateClient createBeamFnStateClient(ApiServiceDescriptor apiServiceDescriptor) {
-    return new GrpcStateClient(apiServiceDescriptor);
+  public synchronized BeamFnStateClient forApiServiceDescriptor(
+      ApiServiceDescriptor apiServiceDescriptor) throws IOException {
+    // We specifically are synchronized so that we only create one GrpcStateClient at a time
+    // preventing a race where multiple GrpcStateClient objects might be constructed at the same
+    // for the same ApiServiceDescriptor.
+    BeamFnStateClient rval;
+    synchronized (cache) {
+      rval = cache.get(apiServiceDescriptor);
+    }
+    if (rval == null) {
+      // We can't be synchronized on cache while constructing the GrpcStateClient since if the
+      // connection fails, onError may be invoked from the gRPC thread which will invoke
+      // closeAndCleanUp that clears the cache.
+      rval = new GrpcStateClient(apiServiceDescriptor);
+      synchronized (cache) {
+        cache.put(apiServiceDescriptor, rval);

Review comment:
       No, since the method is synchronized so we know that there will only be one thread at a time using it. The additional lock around `cache` is to ensure proper usage with the GrpcStateClient object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #17240:
URL: https://github.com/apache/beam/pull/17240#issuecomment-1086173083


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org