You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/06 19:18:25 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #22162: Optimize locking in several critical-path methods

lukecwik commented on code in PR #22162:
URL: https://github.com/apache/beam/pull/22162#discussion_r915171497


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java:
##########
@@ -170,7 +170,7 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, Ou
    * number of classes consuming PermGen memory.

Review Comment:
   ```suggestion
      * number of classes consuming PermGen memory.
      *
      * <p>Note that special care must be taken to enumerate this object as concurrent hash maps are <a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly consistent</a>.
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java:
##########
@@ -671,50 +726,50 @@ private void serializeEntry(
     public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
         throws IOException {
       ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
-      synchronized (handler) {
-        PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
-        // We first filter out any properties that have been modified since
-        // the last serialization of this PipelineOptions and then verify that
-        // they are all serializable.
-        Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
-        Map<String, JsonSerializer<Object>> propertyToSerializer =
-            getSerializerMap(cache, handler.knownInterfaces);
-        removeIgnoredOptions(cache, handler.knownInterfaces, filteredOptions);
-        ensureSerializable(cache, handler.knownInterfaces, filteredOptions, propertyToSerializer);
-
-        // Now we create the map of serializable options by taking the original
-        // set of serialized options (if any) and updating them with any properties
-        // instances that have been modified since the previous serialization.
-        Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
-        for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
-          serializableOptions.put(entry.getKey(), entry.getValue().getValue());
-        }
+      PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
+      // We first filter out any properties that have been modified since
+      // the last serialization of this PipelineOptions and then verify that
+      // they are all serializable.
+      Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
+      Set<Class<? extends PipelineOptions>> knownInterfaces =
+          handler.computedProperties.knownInterfaces;
+      Map<String, JsonSerializer<Object>> propertyToSerializer =
+          getSerializerMap(cache, knownInterfaces);
+      removeIgnoredOptions(cache, knownInterfaces, filteredOptions);
+      ensureSerializable(cache, knownInterfaces, filteredOptions, propertyToSerializer);

Review Comment:
   Can we make `getSerializerMap`, `removeIgnoredOptions`, and `ensureSerializable` to be static methods. It helps with understanding the ordering since the static method will be prevented from reading `this`.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java:
##########
@@ -114,7 +114,8 @@ public class DoFnSignatures {
 
   private DoFnSignatures() {}
 
-  private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+  private static final Map<Class<? extends DoFn<?, ?>>, DoFnSignature> signatureCache =

Review Comment:
   ```suggestion
    /**
      * Note that special care must be taken to enumerate this object as concurrent hash maps are <a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly consistent</a>.
      */
     private static final Map<Class<? extends DoFn<?, ?>>, DoFnSignature> signatureCache =
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java:
##########
@@ -302,7 +358,7 @@ class PipelineOptionsDisplayData implements HasDisplayData {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       Set<PipelineOptionSpec> optionSpecs =

Review Comment:
   ```suggestion
         // We must first make a copy of the current options because a concurrent modification
         // may add a new option after we have derived optionSpecs but before we have enumerated
         // all the pipeline options.
         Map<String, BoundValue> copiedOptions = new HashMap<>(options);
         Set<PipelineOptionSpec> optionSpecs =
   ```
   And then update usage below to use the copy.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java:
##########
@@ -96,21 +98,56 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
    */
   private final int hashCode = ThreadLocalRandom.current().nextInt();
 
-  private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+  private static final class ComputedProperties {
+    final ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+    final ImmutableMap<String, String> gettersToPropertyNames;
+    final ImmutableMap<String, String> settersToPropertyNames;
+    final ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces;
+
+    private ComputedProperties(
+        ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache,
+        ImmutableMap<String, String> gettersToPropertyNames,
+        ImmutableMap<String, String> settersToPropertyNames,
+        ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces) {
+      this.interfaceToProxyCache = interfaceToProxyCache;
+      this.gettersToPropertyNames = gettersToPropertyNames;
+      this.settersToPropertyNames = settersToPropertyNames;
+      this.knownInterfaces = knownInterfaces;
+    }
+
+    <T extends PipelineOptions> ComputedProperties updated(
+        Class<T> iface, T instance, List<PropertyDescriptor> propertyDescriptors) {
+
+      // these all use mutable maps and then copyOf, rather than a builder because builders enforce
+      // all keys are unique, and its possible they are not here.
+      Map<String, String> allNewGetters = Maps.newHashMap(gettersToPropertyNames);
+      Map<String, String> allNewSetters = Maps.newHashMap(settersToPropertyNames);
+      Set<Class<? extends PipelineOptions>> newKnownInterfaces = Sets.newHashSet(knownInterfaces);
+      Map<Class<? extends PipelineOptions>, PipelineOptions> newInterfaceCache =
+          Maps.newHashMap(interfaceToProxyCache);
+
+      allNewGetters.putAll(generateGettersToPropertyNames(propertyDescriptors));
+      allNewSetters.putAll(generateSettersToPropertyNames(propertyDescriptors));
+      newKnownInterfaces.add(iface);
+      newInterfaceCache.put(iface, instance);
+
+      return new ComputedProperties(
+          ImmutableClassToInstanceMap.copyOf(newInterfaceCache),
+          ImmutableMap.copyOf(allNewGetters),
+          ImmutableMap.copyOf(allNewSetters),
+          ImmutableSet.copyOf(newKnownInterfaces));
+    }
+  }
 
-  // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
-  // exception in writeObject()
   @SuppressFBWarnings("SE_BAD_FIELD")
-  private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+  private volatile ComputedProperties computedProperties;

Review Comment:
   ```suggestion
     /** Only modified while holding a lock on {@code this}. */
     private volatile ComputedProperties computedProperties;
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java:
##########
@@ -96,21 +98,56 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
    */
   private final int hashCode = ThreadLocalRandom.current().nextInt();
 
-  private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+  private static final class ComputedProperties {
+    final ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+    final ImmutableMap<String, String> gettersToPropertyNames;
+    final ImmutableMap<String, String> settersToPropertyNames;
+    final ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces;
+
+    private ComputedProperties(
+        ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache,
+        ImmutableMap<String, String> gettersToPropertyNames,
+        ImmutableMap<String, String> settersToPropertyNames,
+        ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces) {
+      this.interfaceToProxyCache = interfaceToProxyCache;
+      this.gettersToPropertyNames = gettersToPropertyNames;
+      this.settersToPropertyNames = settersToPropertyNames;
+      this.knownInterfaces = knownInterfaces;
+    }
+
+    <T extends PipelineOptions> ComputedProperties updated(
+        Class<T> iface, T instance, List<PropertyDescriptor> propertyDescriptors) {
+
+      // these all use mutable maps and then copyOf, rather than a builder because builders enforce
+      // all keys are unique, and its possible they are not here.
+      Map<String, String> allNewGetters = Maps.newHashMap(gettersToPropertyNames);
+      Map<String, String> allNewSetters = Maps.newHashMap(settersToPropertyNames);
+      Set<Class<? extends PipelineOptions>> newKnownInterfaces = Sets.newHashSet(knownInterfaces);
+      Map<Class<? extends PipelineOptions>, PipelineOptions> newInterfaceCache =
+          Maps.newHashMap(interfaceToProxyCache);
+
+      allNewGetters.putAll(generateGettersToPropertyNames(propertyDescriptors));
+      allNewSetters.putAll(generateSettersToPropertyNames(propertyDescriptors));
+      newKnownInterfaces.add(iface);
+      newInterfaceCache.put(iface, instance);
+
+      return new ComputedProperties(
+          ImmutableClassToInstanceMap.copyOf(newInterfaceCache),
+          ImmutableMap.copyOf(allNewGetters),
+          ImmutableMap.copyOf(allNewSetters),
+          ImmutableSet.copyOf(newKnownInterfaces));
+    }
+  }
 
-  // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
-  // exception in writeObject()
   @SuppressFBWarnings("SE_BAD_FIELD")
-  private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+  private volatile ComputedProperties computedProperties;
 
   // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
   // exception in writeObject()
   @SuppressFBWarnings("SE_BAD_FIELD")
-  private final Map<String, BoundValue> options;
+  private final ConcurrentHashMap<String, BoundValue> options;

Review Comment:
   ```suggestion
     /**
       * Enumerating {@code options} must always be done on a copy made before accessing or deriving properties from {@code computedProperties} since concurrent hash maps are <a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly consistent</a>. This will allow us to ensure that the keys in {code options} will always be a subset of properties stored in {code computedProperties}. 
       */
     private final ConcurrentHashMap<String, BoundValue> options;
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java:
##########
@@ -671,50 +726,50 @@ private void serializeEntry(
     public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
         throws IOException {
       ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
-      synchronized (handler) {
-        PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
-        // We first filter out any properties that have been modified since
-        // the last serialization of this PipelineOptions and then verify that
-        // they are all serializable.
-        Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
-        Map<String, JsonSerializer<Object>> propertyToSerializer =
-            getSerializerMap(cache, handler.knownInterfaces);
-        removeIgnoredOptions(cache, handler.knownInterfaces, filteredOptions);
-        ensureSerializable(cache, handler.knownInterfaces, filteredOptions, propertyToSerializer);
-
-        // Now we create the map of serializable options by taking the original
-        // set of serialized options (if any) and updating them with any properties
-        // instances that have been modified since the previous serialization.
-        Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
-        for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
-          serializableOptions.put(entry.getKey(), entry.getValue().getValue());
-        }
+      PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
+      // We first filter out any properties that have been modified since
+      // the last serialization of this PipelineOptions and then verify that
+      // they are all serializable.

Review Comment:
   ```suggestion
         // We first copy and then filter out any properties that have been modified since
         // the last serialization of this PipelineOptions and then verify that
         // they are all serializable.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org