You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/01 17:38:06 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #17240: [BEAM-13519] Solve race issues when the server responds with an error before the GrpcStateClient finishes being constructed.

lukecwik commented on a change in pull request #17240:
URL: https://github.com/apache/beam/pull/17240#discussion_r840792682



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -59,74 +58,108 @@ public BeamFnStateGrpcClientCache(
     // This showed a 1-2% improvement in the ProcessBundleBenchmark#testState* benchmarks.
     this.channelFactory = channelFactory.withDirectExecutor();
     this.outboundObserverFactory = outboundObserverFactory;
-    this.cache = new ConcurrentHashMap<>();
+    this.cache = new HashMap<>();
   }
 
   /**
    * Creates or returns an existing {@link BeamFnStateClient} depending on whether the passed in
    * {@link ApiServiceDescriptor} currently has a {@link BeamFnStateClient} bound to the same
    * channel.
    */
-  public BeamFnStateClient forApiServiceDescriptor(ApiServiceDescriptor apiServiceDescriptor)
-      throws IOException {
-    return cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
-  }
-
-  private BeamFnStateClient createBeamFnStateClient(ApiServiceDescriptor apiServiceDescriptor) {
-    return new GrpcStateClient(apiServiceDescriptor);
+  public synchronized BeamFnStateClient forApiServiceDescriptor(
+      ApiServiceDescriptor apiServiceDescriptor) throws IOException {
+    // We specifically are synchronized so that we only create one GrpcStateClient at a time
+    // preventing a race where multiple GrpcStateClient objects might be constructed at the same
+    // for the same ApiServiceDescriptor.
+    BeamFnStateClient rval;
+    synchronized (cache) {
+      rval = cache.get(apiServiceDescriptor);
+    }
+    if (rval == null) {
+      // We can't be synchronized on cache while constructing the GrpcStateClient since if the
+      // connection fails, onError may be invoked from the gRPC thread which will invoke
+      // closeAndCleanUp that clears the cache.
+      rval = new GrpcStateClient(apiServiceDescriptor);
+      synchronized (cache) {
+        cache.put(apiServiceDescriptor, rval);

Review comment:
       No, since the method is synchronized so we know that there will only be one thread at a time using it. The additional lock around `cache` is to ensure proper usage with the GrpcStateClient object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org