You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/07 23:21:35 UTC

[beam] branch master updated: Optimize locking in several critical-path methods (#22162)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e14f5181d8 Optimize locking in several critical-path methods (#22162)
0e14f5181d8 is described below

commit 0e14f5181d8180b3fd88fd83d39f9199a5db99ae
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Thu Jul 7 19:21:30 2022 -0400

    Optimize locking in several critical-path methods (#22162)
    
    * Optimize locking in several critical-path methods
    
    * spotbugs
    
    * Apply suggestions from code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * review comments
    
    * unit test for concurrency
    
    * the buddy of bytes
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 .../beam/sdk/options/ProxyInvocationHandler.java   | 263 +++++++++++++--------
 .../reflect/ByteBuddyDoFnInvokerFactory.java       |  31 +--
 .../sdk/transforms/reflect/DoFnSignatures.java     |  14 +-
 .../sdk/options/ProxyInvocationHandlerTest.java    | 123 ++++++++++
 4 files changed, 315 insertions(+), 116 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index d9524115de9..77324eb7f18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.options.PipelineOptionsFactory.AnnotationPredicates;
@@ -63,14 +64,15 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Defaults;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ClassToInstanceMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableClassToInstanceMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.MutableClassToInstanceMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
@@ -96,21 +98,64 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
    */
   private final int hashCode = ThreadLocalRandom.current().nextInt();
 
-  private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+  private static final class ComputedProperties {
+    final ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+    final ImmutableMap<String, String> gettersToPropertyNames;
+    final ImmutableMap<String, String> settersToPropertyNames;
+    final ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces;
+
+    private ComputedProperties(
+        ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache,
+        ImmutableMap<String, String> gettersToPropertyNames,
+        ImmutableMap<String, String> settersToPropertyNames,
+        ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces) {
+      this.interfaceToProxyCache = interfaceToProxyCache;
+      this.gettersToPropertyNames = gettersToPropertyNames;
+      this.settersToPropertyNames = settersToPropertyNames;
+      this.knownInterfaces = knownInterfaces;
+    }
 
-  // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
-  // exception in writeObject()
+    <T extends PipelineOptions> ComputedProperties updated(
+        Class<T> iface, T instance, List<PropertyDescriptor> propertyDescriptors) {
+
+      // these all use mutable maps and then copyOf, rather than a builder because builders enforce
+      // all keys are unique, and its possible they are not here.
+      Map<String, String> allNewGetters = Maps.newHashMap(gettersToPropertyNames);
+      Map<String, String> allNewSetters = Maps.newHashMap(settersToPropertyNames);
+      Set<Class<? extends PipelineOptions>> newKnownInterfaces = Sets.newHashSet(knownInterfaces);
+      Map<Class<? extends PipelineOptions>, PipelineOptions> newInterfaceCache =
+          Maps.newHashMap(interfaceToProxyCache);
+
+      allNewGetters.putAll(generateGettersToPropertyNames(propertyDescriptors));
+      allNewSetters.putAll(generateSettersToPropertyNames(propertyDescriptors));
+      newKnownInterfaces.add(iface);
+      newInterfaceCache.put(iface, instance);
+
+      return new ComputedProperties(
+          ImmutableClassToInstanceMap.copyOf(newInterfaceCache),
+          ImmutableMap.copyOf(allNewGetters),
+          ImmutableMap.copyOf(allNewSetters),
+          ImmutableSet.copyOf(newKnownInterfaces));
+    }
+  }
+
+  /** Only modified while holding a lock on {@code this}. */
   @SuppressFBWarnings("SE_BAD_FIELD")
-  private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+  private volatile ComputedProperties computedProperties;
 
   // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
   // exception in writeObject()
+  /**
+   * Enumerating {@code options} must always be done on a copy made before accessing or deriving
+   * properties from {@code computedProperties} since concurrent hash maps are <a
+   * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+   * consistent</a>. This will allow us to ensure that the keys in {code options} will always be a
+   * subset of properties stored in {code computedProperties}.
+   */
   @SuppressFBWarnings("SE_BAD_FIELD")
-  private final Map<String, BoundValue> options;
+  private final ConcurrentHashMap<String, BoundValue> options;
 
-  private final Map<String, JsonNode> jsonOptions;
-  private final Map<String, String> gettersToPropertyNames;
-  private final Map<String, String> settersToPropertyNames;
+  private final ImmutableMap<String, JsonNode> jsonOptions;
 
   ProxyInvocationHandler(Map<String, Object> options) {
     this(bindOptions(options), Maps.newHashMap());
@@ -127,15 +172,18 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
 
   private ProxyInvocationHandler(
       Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions) {
-    this.options = options;
-    this.jsonOptions = jsonOptions;
-    this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
-    gettersToPropertyNames = Maps.newHashMap();
-    settersToPropertyNames = Maps.newHashMap();
-    interfaceToProxyCache = MutableClassToInstanceMap.create();
+    this.options = new ConcurrentHashMap<>(options);
+    this.jsonOptions = ImmutableMap.copyOf(jsonOptions);
+    this.computedProperties =
+        new ComputedProperties(
+            ImmutableClassToInstanceMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableSet.copyOf(PipelineOptionsFactory.getRegisteredOptions()));
   }
 
   @Override
+  @SuppressFBWarnings("AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION")
   public Object invoke(Object proxy, Method method, Object[] args) {
     if (args == null && "toString".equals(method.getName())) {
       return toString();
@@ -158,29 +206,33 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
       return Void.TYPE;
     }
     String methodName = method.getName();
-    synchronized (this) {
-      if (gettersToPropertyNames.containsKey(methodName)) {
-        String propertyName = gettersToPropertyNames.get(methodName);
-        if (!options.containsKey(propertyName)) {
-          // Lazy bind the default to the method.
-          Object value =
-              jsonOptions.containsKey(propertyName)
-                  ? getValueFromJson(propertyName, method)
-                  : getDefault((PipelineOptions) proxy, method);
-          options.put(propertyName, BoundValue.fromDefault(value));
-        }
-        return options.get(propertyName).getValue();
-      } else if (settersToPropertyNames.containsKey(methodName)) {
-        options.put(settersToPropertyNames.get(methodName), BoundValue.fromExplicitOption(args[0]));
-        return Void.TYPE;
+
+    ComputedProperties properties = computedProperties;
+    if (properties.gettersToPropertyNames.containsKey(methodName)) {
+      String propertyName = properties.gettersToPropertyNames.get(methodName);
+      // we can't use computeIfAbsent here because evaluating the default may cause more properties
+      // to be evaluated, and computeIfAbsent is not re-entrant.
+      if (!options.containsKey(propertyName)) {
+        // Lazy bind the default to the method.
+        Object value =
+            jsonOptions.containsKey(propertyName)
+                ? getValueFromJson(propertyName, method)
+                : getDefault((PipelineOptions) proxy, method);
+        options.put(propertyName, BoundValue.fromDefault(value));
       }
+      return options.get(propertyName).getValue();
+    } else if (properties.settersToPropertyNames.containsKey(methodName)) {
+      options.put(
+          properties.settersToPropertyNames.get(methodName),
+          BoundValue.fromExplicitOption(args[0]));
+      return Void.TYPE;
     }
     throw new RuntimeException(
         "Unknown method [" + method + "] invoked with args [" + Arrays.toString(args) + "].");
   }
 
   public String getOptionName(Method method) {
-    return gettersToPropertyNames.get(method.getName());
+    return computedProperties.gettersToPropertyNames.get(method.getName());
   }
 
   private void writeObject(java.io.ObjectOutputStream stream) throws IOException {
@@ -222,25 +274,36 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
    * @param iface The interface that the returned object needs to implement.
    * @return An object that implements the interface {@code <T>}.
    */
-  synchronized <T extends PipelineOptions> T as(Class<T> iface) {
+  <T extends PipelineOptions> T as(Class<T> iface) {
     checkNotNull(iface);
     checkArgument(iface.isInterface(), "Not an interface: %s", iface);
-    if (!interfaceToProxyCache.containsKey(iface)) {
-      Registration<T> registration =
-          PipelineOptionsFactory.CACHE.get().validateWellFormed(iface, knownInterfaces);
-      List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
-      Class<T> proxyClass = registration.getProxyClass();
-      gettersToPropertyNames.putAll(generateGettersToPropertyNames(propertyDescriptors));
-      settersToPropertyNames.putAll(generateSettersToPropertyNames(propertyDescriptors));
-      knownInterfaces.add(iface);
-      interfaceToProxyCache.putInstance(
-          iface,
-          InstanceBuilder.ofType(proxyClass)
-              .fromClass(proxyClass)
-              .withArg(InvocationHandler.class, this)
-              .build());
+
+    T existingOption = computedProperties.interfaceToProxyCache.getInstance(iface);
+    if (existingOption == null) {
+      synchronized (this) {
+        // double check
+        existingOption = computedProperties.interfaceToProxyCache.getInstance(iface);
+        if (existingOption == null) {
+          Registration<T> registration =
+              PipelineOptionsFactory.CACHE
+                  .get()
+                  .validateWellFormed(iface, computedProperties.knownInterfaces);
+          List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
+
+          Class<T> proxyClass = registration.getProxyClass();
+
+          existingOption =
+              InstanceBuilder.ofType(proxyClass)
+                  .fromClass(proxyClass)
+                  .withArg(InvocationHandler.class, this)
+                  .build();
+
+          computedProperties =
+              computedProperties.updated(iface, existingOption, propertyDescriptors);
+        }
+      }
     }
-    return interfaceToProxyCache.getInstance(iface);
+    return existingOption;
   }
 
   /**
@@ -270,7 +333,8 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
 
   /** Returns a map of properties which correspond to {@link RuntimeValueProvider}. */
   public Map<String, Map<String, Object>> outputRuntimeOptions(PipelineOptions options) {
-    Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+    Set<PipelineOptionSpec> optionSpecs =
+        PipelineOptionsReflector.getOptionSpecs(computedProperties.knownInterfaces);
     Map<String, Map<String, Object>> properties = Maps.newHashMap();
 
     for (PipelineOptionSpec spec : optionSpecs) {
@@ -301,12 +365,16 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
      */
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
+      // We must first make a copy of the current options because a concurrent modification
+      // may add a new option after we have derived optionSpecs but before we have enumerated
+      // all the pipeline options.
+      Map<String, BoundValue> copiedOptions = new HashMap<>(options);
       Set<PipelineOptionSpec> optionSpecs =
-          PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+          PipelineOptionsReflector.getOptionSpecs(computedProperties.knownInterfaces);
 
       Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
 
-      for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+      for (Map.Entry<String, BoundValue> option : copiedOptions.entrySet()) {
         BoundValue boundValue = option.getValue();
         if (boundValue.isDefault()) {
           continue;
@@ -333,7 +401,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
       }
 
       for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
-        if (options.containsKey(jsonOption.getKey())) {
+        if (copiedOptions.containsKey(jsonOption.getKey())) {
           // Option overwritten since deserialization; don't re-write
           continue;
         }
@@ -483,10 +551,9 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
    * @return A pretty printed string representation of this.
    */
   @Override
-  public synchronized String toString() {
-    SortedMap<String, Object> sortedOptions = new TreeMap<>();
+  public String toString() {
     // Add the options that we received from deserialization
-    sortedOptions.putAll(jsonOptions);
+    SortedMap<String, Object> sortedOptions = new TreeMap<>(jsonOptions);
     // Override with any programmatically set options.
     for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
       sortedOptions.put(entry.getKey(), entry.getValue().getValue());
@@ -555,7 +622,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
       }
     }
     if (method.getReturnType().equals(ValueProvider.class)) {
-      String propertyName = gettersToPropertyNames.get(method.getName());
+      String propertyName = computedProperties.gettersToPropertyNames.get(method.getName());
       return defaultObject == null
           ? new RuntimeValueProvider(
               method.getName(),
@@ -652,7 +719,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
   }
 
   static class Serializer extends JsonSerializer<PipelineOptions> {
-    private void serializeEntry(
+    private static void serializeEntry(
         String name,
         Object value,
         JsonGenerator jgen,
@@ -671,53 +738,53 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
     public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
         throws IOException {
       ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
-      synchronized (handler) {
-        PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
-        // We first filter out any properties that have been modified since
-        // the last serialization of this PipelineOptions and then verify that
-        // they are all serializable.
-        Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
-        Map<String, JsonSerializer<Object>> propertyToSerializer =
-            getSerializerMap(cache, handler.knownInterfaces);
-        removeIgnoredOptions(cache, handler.knownInterfaces, filteredOptions);
-        ensureSerializable(cache, handler.knownInterfaces, filteredOptions, propertyToSerializer);
-
-        // Now we create the map of serializable options by taking the original
-        // set of serialized options (if any) and updating them with any properties
-        // instances that have been modified since the previous serialization.
-        Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
-        for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
-          serializableOptions.put(entry.getKey(), entry.getValue().getValue());
-        }
-
-        jgen.writeStartObject();
-        jgen.writeFieldName("options");
+      PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
+      // We first copy and then filter out any properties that have been modified since
+      // the last serialization of this PipelineOptions and then verify that
+      // they are all serializable.
+      Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
+      Set<Class<? extends PipelineOptions>> knownInterfaces =
+          handler.computedProperties.knownInterfaces;
+      Map<String, JsonSerializer<Object>> propertyToSerializer =
+          getSerializerMap(cache, knownInterfaces);
+      removeIgnoredOptions(cache, knownInterfaces, filteredOptions);
+      ensureSerializable(cache, knownInterfaces, filteredOptions, propertyToSerializer);
+
+      // Now we create the map of serializable options by taking the original
+      // set of serialized options (if any) and updating them with any properties
+      // instances that have been modified since the previous serialization.
+      Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
+      for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
+        serializableOptions.put(entry.getKey(), entry.getValue().getValue());
+      }
 
-        jgen.writeStartObject();
+      jgen.writeStartObject();
+      jgen.writeFieldName("options");
 
-        for (Map.Entry<String, Object> entry : serializableOptions.entrySet()) {
-          jgen.writeFieldName(entry.getKey());
-          serializeEntry(entry.getKey(), entry.getValue(), jgen, propertyToSerializer);
-        }
+      jgen.writeStartObject();
 
-        jgen.writeEndObject();
+      for (Map.Entry<String, Object> entry : serializableOptions.entrySet()) {
+        jgen.writeFieldName(entry.getKey());
+        serializeEntry(entry.getKey(), entry.getValue(), jgen, propertyToSerializer);
+      }
 
-        List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
-        DisplayData displayData = DisplayData.from(value);
-        for (DisplayData.Item item : displayData.items()) {
-          @SuppressWarnings("unchecked")
-          Map<String, Object> serializedItem =
-              PipelineOptionsFactory.MAPPER.convertValue(item, Map.class);
-          serializedDisplayData.add(serializedItem);
-        }
+      jgen.writeEndObject();
 
-        jgen.writeFieldName("display_data");
-        jgen.writeObject(serializedDisplayData);
-        jgen.writeEndObject();
+      List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
+      DisplayData displayData = DisplayData.from(value);
+      for (DisplayData.Item item : displayData.items()) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> serializedItem =
+            PipelineOptionsFactory.MAPPER.convertValue(item, Map.class);
+        serializedDisplayData.add(serializedItem);
       }
+
+      jgen.writeFieldName("display_data");
+      jgen.writeObject(serializedDisplayData);
+      jgen.writeEndObject();
     }
 
-    private Map<String, JsonSerializer<Object>> getSerializerMap(
+    private static Map<String, JsonSerializer<Object>> getSerializerMap(
         PipelineOptionsFactory.Cache cache, Set<Class<? extends PipelineOptions>> interfaces) {
 
       Map<String, JsonSerializer<Object>> propertyToSerializer = Maps.newHashMap();
@@ -738,7 +805,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
      * We remove all properties within the passed in options where there getter is annotated with
      * {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
      */
-    private void removeIgnoredOptions(
+    private static void removeIgnoredOptions(
         PipelineOptionsFactory.Cache cache,
         Set<Class<? extends PipelineOptions>> interfaces,
         Map<String, ?> options) {
@@ -761,7 +828,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
      * We use an {@link ObjectMapper} to verify that the passed in options are serializable and
      * deserializable.
      */
-    private void ensureSerializable(
+    private static void ensureSerializable(
         PipelineOptionsFactory.Cache cache,
         Set<Class<? extends PipelineOptions>> interfaces,
         Map<String, BoundValue> options,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index ff574bc6cf1..594162809bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -24,9 +24,9 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -168,9 +168,13 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
    * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
    * Needed because generating an invoker class is expensive, and to avoid generating an excessive
    * number of classes consuming PermGen memory.
+   *
+   * <p>Note that special care must be taken to enumerate this object as concurrent hash maps are <a
+   * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+   * consistent</a>.
    */
   private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
-      new LinkedHashMap<>();
+      new ConcurrentHashMap<>();
 
   private ByteBuddyDoFnInvokerFactory() {}
 
@@ -291,19 +295,18 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
    * <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link
    * DoFn} class.
    */
-  private synchronized Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
+  private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
-    Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
-    if (constructor == null) {
-      Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
-      try {
-        constructor = invokerClass.getConstructor(fnClass);
-      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
-        throw new RuntimeException(e);
-      }
-      byteBuddyInvokerConstructorCache.put(fnClass, constructor);
-    }
-    return constructor;
+    return byteBuddyInvokerConstructorCache.computeIfAbsent(
+        fnClass,
+        clazz -> {
+          Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+          try {
+            return invokerClass.getConstructor(clazz);
+          } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+            throw new RuntimeException(e);
+          }
+        });
   }
 
   /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 748c1b9fdb6..632c30a3d99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -33,10 +33,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
@@ -114,7 +114,13 @@ public class DoFnSignatures {
 
   private DoFnSignatures() {}
 
-  private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+  /**
+   * Note that special care must be taken to enumerate this object as concurrent hash maps are <a
+   * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+   * consistent</a>.
+   */
+  private static final Map<Class<? extends DoFn<?, ?>>, DoFnSignature> signatureCache =
+      new ConcurrentHashMap<>();
 
   private static final ImmutableList<Class<? extends Parameter>>
       ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
@@ -290,8 +296,8 @@ public class DoFnSignatures {
   }
 
   /** @return the {@link DoFnSignature} for the given {@link DoFn} subclass. */
-  public static synchronized <FnT extends DoFn<?, ?>> DoFnSignature getSignature(Class<FnT> fn) {
-    return signatureCache.computeIfAbsent(fn, k -> parseSignature(fn));
+  public static <FnT extends DoFn<?, ?>> DoFnSignature getSignature(Class<FnT> fn) {
+    return signatureCache.computeIfAbsent(fn, DoFnSignatures::parseSignature);
   }
 
   /**
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 4e55e9636f9..211d89d9b74 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -41,31 +41,47 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
 import com.google.common.testing.EqualsTester;
 import java.io.IOException;
 import java.io.NotSerializableException;
 import java.io.Serializable;
+import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.description.modifier.Visibility;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.description.type.TypeDescription;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.dynamic.DynamicType;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.apache.commons.lang3.SystemUtils;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matchers;
@@ -1246,4 +1262,111 @@ public class ProxyInvocationHandlerTest {
     handler.as(BaseOptions.class);
     assertEquals("foo", handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
   }
+
+  private DynamicType.Unloaded<? extends PipelineOptions> spinNewInterface(int methodNumber) {
+    return new ByteBuddy()
+        .makeInterface(PipelineOptions.class)
+        .defineMethod("getDynamicMethod" + methodNumber, String.class, Visibility.PUBLIC)
+        .withoutCode()
+        .defineMethod("setDynamicMethod" + methodNumber, Void.TYPE, Visibility.PUBLIC)
+        .withParameters(String.class)
+        .withoutCode()
+        .make();
+  }
+
+  private Map<Integer, Class<? extends PipelineOptions>> loadAllInterfaces(
+      int numInterfaces, ClassLoader classLoader) {
+    List<DynamicType.Unloaded<? extends PipelineOptions>> dynamicInterfaces =
+        IntStream.range(0, numInterfaces)
+            .mapToObj(this::spinNewInterface)
+            .collect(Collectors.toList());
+
+    DynamicType.Loaded<Object> root =
+        new ByteBuddy().subclass(Object.class).make().include(dynamicInterfaces).load(classLoader);
+
+    Map<TypeDescription, Class<?>> loadedInterfaces = root.getLoadedAuxiliaryTypes();
+    Map<Integer, Class<? extends PipelineOptions>> result = Maps.newHashMap();
+
+    IntStream.range(0, numInterfaces)
+        .forEach(
+            i -> {
+              DynamicType.Unloaded<? extends PipelineOptions> iface = dynamicInterfaces.get(i);
+              Class<?> clazz = loadedInterfaces.get(iface.getTypeDescription());
+              result.put(i, (Class<? extends PipelineOptions>) clazz);
+            });
+
+    return result;
+  }
+
+  @Test
+  public void testConcurrency() throws Exception {
+    int numInterfaces = 100;
+    int numWorkers = 10;
+    int numReaders = 10;
+    int step = numInterfaces / numWorkers;
+
+    ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.newHashMap());
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    Map<Integer, Class<? extends PipelineOptions>> ifaces = loadAllInterfaces(numInterfaces, cl);
+    CountDownLatch startWaiter = new CountDownLatch(1);
+    CountDownLatch done = new CountDownLatch(numWorkers);
+
+    ExecutorService executor = Executors.newFixedThreadPool(numWorkers + numReaders);
+    List<Future<?>> futs = Lists.newArrayList();
+
+    // launch `numWorkers` concurrent "writers" that will try to cast the proxy handler to various
+    // interfaces and set a value on them.
+    for (int start = 0; start < numInterfaces; start += step) {
+      final int s = start;
+      final int end = start + step;
+      Callable<Void> worker =
+          () -> {
+            startWaiter.await();
+            for (int i = s; i < end; i++) {
+              Class<? extends PipelineOptions> iface = ifaces.get(i);
+              Method setter = iface.getDeclaredMethod("setDynamicMethod" + i, String.class);
+              PipelineOptions opt1 = handler.as(iface);
+              setter.invoke(opt1, "test-" + i);
+            }
+            done.countDown();
+            return null;
+          };
+      futs.add(executor.submit(worker));
+    }
+
+    // launch concurrent readers that call `toString` and serializes the options, making sure they
+    // don't fail.
+    for (int i = 0; i < 10; i++) {
+      Callable<Void> worker =
+          () -> {
+            while (true) {
+              if (done.getCount() == 0) {
+                return null;
+              }
+              assertNotNull(handler.toString());
+              PipelineOptionsFactory.MAPPER.writeValue(
+                  ByteStreams.nullOutputStream(), handler.as(PipelineOptions.class));
+            }
+          };
+      futs.add(executor.submit(worker));
+    }
+
+    // wait for everything to finish
+    startWaiter.countDown();
+    for (Future<?> fut : futs) {
+      fut.get();
+    }
+
+    // ensure that the serialized json contains every value we set.
+    TokenBuffer tokenBuffer = new TokenBuffer(PipelineOptionsFactory.MAPPER, false);
+    PipelineOptionsFactory.MAPPER.writeValue(tokenBuffer, handler.as(PipelineOptions.class));
+
+    JsonNode tree = PipelineOptionsFactory.MAPPER.readTree(tokenBuffer.asParser());
+    JsonNode optionsNode = tree.get("options");
+    for (int i = 0; i < numInterfaces; i++) {
+      JsonNode methodNode = optionsNode.get("dynamicMethod" + i);
+      assertNotNull(methodNode);
+      assertEquals("test-" + i, methodNode.asText());
+    }
+  }
 }