You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/07 23:21:35 UTC
[beam] branch master updated: Optimize locking in several critical-path methods (#22162)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0e14f5181d8 Optimize locking in several critical-path methods (#22162)
0e14f5181d8 is described below
commit 0e14f5181d8180b3fd88fd83d39f9199a5db99ae
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Thu Jul 7 19:21:30 2022 -0400
Optimize locking in several critical-path methods (#22162)
* Optimize locking in several critical-path methods
* spotbugs
* Apply suggestions from code review
Co-authored-by: Lukasz Cwik <lc...@google.com>
* review comments
* unit test for concurrency
* the buddy of bytes
Co-authored-by: Lukasz Cwik <lc...@google.com>
---
.../beam/sdk/options/ProxyInvocationHandler.java | 263 +++++++++++++--------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 31 +--
.../sdk/transforms/reflect/DoFnSignatures.java | 14 +-
.../sdk/options/ProxyInvocationHandlerTest.java | 123 ++++++++++
4 files changed, 315 insertions(+), 116 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index d9524115de9..77324eb7f18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.options.PipelineOptionsFactory.AnnotationPredicates;
@@ -63,14 +64,15 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Defaults;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ClassToInstanceMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableClassToInstanceMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.MutableClassToInstanceMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -96,21 +98,64 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
*/
private final int hashCode = ThreadLocalRandom.current().nextInt();
- private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+ private static final class ComputedProperties {
+ final ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+ final ImmutableMap<String, String> gettersToPropertyNames;
+ final ImmutableMap<String, String> settersToPropertyNames;
+ final ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces;
+
+ private ComputedProperties(
+ ImmutableClassToInstanceMap<PipelineOptions> interfaceToProxyCache,
+ ImmutableMap<String, String> gettersToPropertyNames,
+ ImmutableMap<String, String> settersToPropertyNames,
+ ImmutableSet<Class<? extends PipelineOptions>> knownInterfaces) {
+ this.interfaceToProxyCache = interfaceToProxyCache;
+ this.gettersToPropertyNames = gettersToPropertyNames;
+ this.settersToPropertyNames = settersToPropertyNames;
+ this.knownInterfaces = knownInterfaces;
+ }
- // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
- // exception in writeObject()
+ <T extends PipelineOptions> ComputedProperties updated(
+ Class<T> iface, T instance, List<PropertyDescriptor> propertyDescriptors) {
+
+ // these all use mutable maps and then copyOf, rather than a builder because builders enforce
+ // all keys are unique, and its possible they are not here.
+ Map<String, String> allNewGetters = Maps.newHashMap(gettersToPropertyNames);
+ Map<String, String> allNewSetters = Maps.newHashMap(settersToPropertyNames);
+ Set<Class<? extends PipelineOptions>> newKnownInterfaces = Sets.newHashSet(knownInterfaces);
+ Map<Class<? extends PipelineOptions>, PipelineOptions> newInterfaceCache =
+ Maps.newHashMap(interfaceToProxyCache);
+
+ allNewGetters.putAll(generateGettersToPropertyNames(propertyDescriptors));
+ allNewSetters.putAll(generateSettersToPropertyNames(propertyDescriptors));
+ newKnownInterfaces.add(iface);
+ newInterfaceCache.put(iface, instance);
+
+ return new ComputedProperties(
+ ImmutableClassToInstanceMap.copyOf(newInterfaceCache),
+ ImmutableMap.copyOf(allNewGetters),
+ ImmutableMap.copyOf(allNewSetters),
+ ImmutableSet.copyOf(newKnownInterfaces));
+ }
+ }
+
+ /** Only modified while holding a lock on {@code this}. */
@SuppressFBWarnings("SE_BAD_FIELD")
- private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+ private volatile ComputedProperties computedProperties;
// ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
// exception in writeObject()
+ /**
+ * Enumerating {@code options} must always be done on a copy made before accessing or deriving
+ * properties from {@code computedProperties} since concurrent hash maps are <a
+ * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+ * consistent</a>. This will allow us to ensure that the keys in {code options} will always be a
+ * subset of properties stored in {code computedProperties}.
+ */
@SuppressFBWarnings("SE_BAD_FIELD")
- private final Map<String, BoundValue> options;
+ private final ConcurrentHashMap<String, BoundValue> options;
- private final Map<String, JsonNode> jsonOptions;
- private final Map<String, String> gettersToPropertyNames;
- private final Map<String, String> settersToPropertyNames;
+ private final ImmutableMap<String, JsonNode> jsonOptions;
ProxyInvocationHandler(Map<String, Object> options) {
this(bindOptions(options), Maps.newHashMap());
@@ -127,15 +172,18 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
private ProxyInvocationHandler(
Map<String, BoundValue> options, Map<String, JsonNode> jsonOptions) {
- this.options = options;
- this.jsonOptions = jsonOptions;
- this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
- gettersToPropertyNames = Maps.newHashMap();
- settersToPropertyNames = Maps.newHashMap();
- interfaceToProxyCache = MutableClassToInstanceMap.create();
+ this.options = new ConcurrentHashMap<>(options);
+ this.jsonOptions = ImmutableMap.copyOf(jsonOptions);
+ this.computedProperties =
+ new ComputedProperties(
+ ImmutableClassToInstanceMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableSet.copyOf(PipelineOptionsFactory.getRegisteredOptions()));
}
@Override
+ @SuppressFBWarnings("AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION")
public Object invoke(Object proxy, Method method, Object[] args) {
if (args == null && "toString".equals(method.getName())) {
return toString();
@@ -158,29 +206,33 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
return Void.TYPE;
}
String methodName = method.getName();
- synchronized (this) {
- if (gettersToPropertyNames.containsKey(methodName)) {
- String propertyName = gettersToPropertyNames.get(methodName);
- if (!options.containsKey(propertyName)) {
- // Lazy bind the default to the method.
- Object value =
- jsonOptions.containsKey(propertyName)
- ? getValueFromJson(propertyName, method)
- : getDefault((PipelineOptions) proxy, method);
- options.put(propertyName, BoundValue.fromDefault(value));
- }
- return options.get(propertyName).getValue();
- } else if (settersToPropertyNames.containsKey(methodName)) {
- options.put(settersToPropertyNames.get(methodName), BoundValue.fromExplicitOption(args[0]));
- return Void.TYPE;
+
+ ComputedProperties properties = computedProperties;
+ if (properties.gettersToPropertyNames.containsKey(methodName)) {
+ String propertyName = properties.gettersToPropertyNames.get(methodName);
+ // we can't use computeIfAbsent here because evaluating the default may cause more properties
+ // to be evaluated, and computeIfAbsent is not re-entrant.
+ if (!options.containsKey(propertyName)) {
+ // Lazy bind the default to the method.
+ Object value =
+ jsonOptions.containsKey(propertyName)
+ ? getValueFromJson(propertyName, method)
+ : getDefault((PipelineOptions) proxy, method);
+ options.put(propertyName, BoundValue.fromDefault(value));
}
+ return options.get(propertyName).getValue();
+ } else if (properties.settersToPropertyNames.containsKey(methodName)) {
+ options.put(
+ properties.settersToPropertyNames.get(methodName),
+ BoundValue.fromExplicitOption(args[0]));
+ return Void.TYPE;
}
throw new RuntimeException(
"Unknown method [" + method + "] invoked with args [" + Arrays.toString(args) + "].");
}
public String getOptionName(Method method) {
- return gettersToPropertyNames.get(method.getName());
+ return computedProperties.gettersToPropertyNames.get(method.getName());
}
private void writeObject(java.io.ObjectOutputStream stream) throws IOException {
@@ -222,25 +274,36 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
* @param iface The interface that the returned object needs to implement.
* @return An object that implements the interface {@code <T>}.
*/
- synchronized <T extends PipelineOptions> T as(Class<T> iface) {
+ <T extends PipelineOptions> T as(Class<T> iface) {
checkNotNull(iface);
checkArgument(iface.isInterface(), "Not an interface: %s", iface);
- if (!interfaceToProxyCache.containsKey(iface)) {
- Registration<T> registration =
- PipelineOptionsFactory.CACHE.get().validateWellFormed(iface, knownInterfaces);
- List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
- Class<T> proxyClass = registration.getProxyClass();
- gettersToPropertyNames.putAll(generateGettersToPropertyNames(propertyDescriptors));
- settersToPropertyNames.putAll(generateSettersToPropertyNames(propertyDescriptors));
- knownInterfaces.add(iface);
- interfaceToProxyCache.putInstance(
- iface,
- InstanceBuilder.ofType(proxyClass)
- .fromClass(proxyClass)
- .withArg(InvocationHandler.class, this)
- .build());
+
+ T existingOption = computedProperties.interfaceToProxyCache.getInstance(iface);
+ if (existingOption == null) {
+ synchronized (this) {
+ // double check
+ existingOption = computedProperties.interfaceToProxyCache.getInstance(iface);
+ if (existingOption == null) {
+ Registration<T> registration =
+ PipelineOptionsFactory.CACHE
+ .get()
+ .validateWellFormed(iface, computedProperties.knownInterfaces);
+ List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
+
+ Class<T> proxyClass = registration.getProxyClass();
+
+ existingOption =
+ InstanceBuilder.ofType(proxyClass)
+ .fromClass(proxyClass)
+ .withArg(InvocationHandler.class, this)
+ .build();
+
+ computedProperties =
+ computedProperties.updated(iface, existingOption, propertyDescriptors);
+ }
+ }
}
- return interfaceToProxyCache.getInstance(iface);
+ return existingOption;
}
/**
@@ -270,7 +333,8 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
/** Returns a map of properties which correspond to {@link RuntimeValueProvider}. */
public Map<String, Map<String, Object>> outputRuntimeOptions(PipelineOptions options) {
- Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+ Set<PipelineOptionSpec> optionSpecs =
+ PipelineOptionsReflector.getOptionSpecs(computedProperties.knownInterfaces);
Map<String, Map<String, Object>> properties = Maps.newHashMap();
for (PipelineOptionSpec spec : optionSpecs) {
@@ -301,12 +365,16 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {
+ // We must first make a copy of the current options because a concurrent modification
+ // may add a new option after we have derived optionSpecs but before we have enumerated
+ // all the pipeline options.
+ Map<String, BoundValue> copiedOptions = new HashMap<>(options);
Set<PipelineOptionSpec> optionSpecs =
- PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
+ PipelineOptionsReflector.getOptionSpecs(computedProperties.knownInterfaces);
Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
- for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+ for (Map.Entry<String, BoundValue> option : copiedOptions.entrySet()) {
BoundValue boundValue = option.getValue();
if (boundValue.isDefault()) {
continue;
@@ -333,7 +401,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
}
for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
- if (options.containsKey(jsonOption.getKey())) {
+ if (copiedOptions.containsKey(jsonOption.getKey())) {
// Option overwritten since deserialization; don't re-write
continue;
}
@@ -483,10 +551,9 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
* @return A pretty printed string representation of this.
*/
@Override
- public synchronized String toString() {
- SortedMap<String, Object> sortedOptions = new TreeMap<>();
+ public String toString() {
// Add the options that we received from deserialization
- sortedOptions.putAll(jsonOptions);
+ SortedMap<String, Object> sortedOptions = new TreeMap<>(jsonOptions);
// Override with any programmatically set options.
for (Map.Entry<String, BoundValue> entry : options.entrySet()) {
sortedOptions.put(entry.getKey(), entry.getValue().getValue());
@@ -555,7 +622,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
}
}
if (method.getReturnType().equals(ValueProvider.class)) {
- String propertyName = gettersToPropertyNames.get(method.getName());
+ String propertyName = computedProperties.gettersToPropertyNames.get(method.getName());
return defaultObject == null
? new RuntimeValueProvider(
method.getName(),
@@ -652,7 +719,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
}
static class Serializer extends JsonSerializer<PipelineOptions> {
- private void serializeEntry(
+ private static void serializeEntry(
String name,
Object value,
JsonGenerator jgen,
@@ -671,53 +738,53 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
- synchronized (handler) {
- PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
- // We first filter out any properties that have been modified since
- // the last serialization of this PipelineOptions and then verify that
- // they are all serializable.
- Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
- Map<String, JsonSerializer<Object>> propertyToSerializer =
- getSerializerMap(cache, handler.knownInterfaces);
- removeIgnoredOptions(cache, handler.knownInterfaces, filteredOptions);
- ensureSerializable(cache, handler.knownInterfaces, filteredOptions, propertyToSerializer);
-
- // Now we create the map of serializable options by taking the original
- // set of serialized options (if any) and updating them with any properties
- // instances that have been modified since the previous serialization.
- Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
- for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
- serializableOptions.put(entry.getKey(), entry.getValue().getValue());
- }
-
- jgen.writeStartObject();
- jgen.writeFieldName("options");
+ PipelineOptionsFactory.Cache cache = PipelineOptionsFactory.CACHE.get();
+ // We first copy and then filter out any properties that have been modified since
+ // the last serialization of this PipelineOptions and then verify that
+ // they are all serializable.
+ Map<String, BoundValue> filteredOptions = Maps.newHashMap(handler.options);
+ Set<Class<? extends PipelineOptions>> knownInterfaces =
+ handler.computedProperties.knownInterfaces;
+ Map<String, JsonSerializer<Object>> propertyToSerializer =
+ getSerializerMap(cache, knownInterfaces);
+ removeIgnoredOptions(cache, knownInterfaces, filteredOptions);
+ ensureSerializable(cache, knownInterfaces, filteredOptions, propertyToSerializer);
+
+ // Now we create the map of serializable options by taking the original
+ // set of serialized options (if any) and updating them with any properties
+ // instances that have been modified since the previous serialization.
+ Map<String, Object> serializableOptions = Maps.newHashMap(handler.jsonOptions);
+ for (Map.Entry<String, BoundValue> entry : filteredOptions.entrySet()) {
+ serializableOptions.put(entry.getKey(), entry.getValue().getValue());
+ }
- jgen.writeStartObject();
+ jgen.writeStartObject();
+ jgen.writeFieldName("options");
- for (Map.Entry<String, Object> entry : serializableOptions.entrySet()) {
- jgen.writeFieldName(entry.getKey());
- serializeEntry(entry.getKey(), entry.getValue(), jgen, propertyToSerializer);
- }
+ jgen.writeStartObject();
- jgen.writeEndObject();
+ for (Map.Entry<String, Object> entry : serializableOptions.entrySet()) {
+ jgen.writeFieldName(entry.getKey());
+ serializeEntry(entry.getKey(), entry.getValue(), jgen, propertyToSerializer);
+ }
- List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
- DisplayData displayData = DisplayData.from(value);
- for (DisplayData.Item item : displayData.items()) {
- @SuppressWarnings("unchecked")
- Map<String, Object> serializedItem =
- PipelineOptionsFactory.MAPPER.convertValue(item, Map.class);
- serializedDisplayData.add(serializedItem);
- }
+ jgen.writeEndObject();
- jgen.writeFieldName("display_data");
- jgen.writeObject(serializedDisplayData);
- jgen.writeEndObject();
+ List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
+ DisplayData displayData = DisplayData.from(value);
+ for (DisplayData.Item item : displayData.items()) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> serializedItem =
+ PipelineOptionsFactory.MAPPER.convertValue(item, Map.class);
+ serializedDisplayData.add(serializedItem);
}
+
+ jgen.writeFieldName("display_data");
+ jgen.writeObject(serializedDisplayData);
+ jgen.writeEndObject();
}
- private Map<String, JsonSerializer<Object>> getSerializerMap(
+ private static Map<String, JsonSerializer<Object>> getSerializerMap(
PipelineOptionsFactory.Cache cache, Set<Class<? extends PipelineOptions>> interfaces) {
Map<String, JsonSerializer<Object>> propertyToSerializer = Maps.newHashMap();
@@ -738,7 +805,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
* We remove all properties within the passed in options where there getter is annotated with
* {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
*/
- private void removeIgnoredOptions(
+ private static void removeIgnoredOptions(
PipelineOptionsFactory.Cache cache,
Set<Class<? extends PipelineOptions>> interfaces,
Map<String, ?> options) {
@@ -761,7 +828,7 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
* We use an {@link ObjectMapper} to verify that the passed in options are serializable and
* deserializable.
*/
- private void ensureSerializable(
+ private static void ensureSerializable(
PipelineOptionsFactory.Cache cache,
Set<Class<? extends PipelineOptions>> interfaces,
Map<String, BoundValue> options,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index ff574bc6cf1..594162809bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -24,9 +24,9 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -168,9 +168,13 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
* A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
* Needed because generating an invoker class is expensive, and to avoid generating an excessive
* number of classes consuming PermGen memory.
+ *
+ * <p>Note that special care must be taken to enumerate this object as concurrent hash maps are <a
+ * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+ * consistent</a>.
*/
private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
- new LinkedHashMap<>();
+ new ConcurrentHashMap<>();
private ByteBuddyDoFnInvokerFactory() {}
@@ -291,19 +295,18 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
* <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link
* DoFn} class.
*/
- private synchronized Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
+ private Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
- Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
- if (constructor == null) {
- Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
- try {
- constructor = invokerClass.getConstructor(fnClass);
- } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
- throw new RuntimeException(e);
- }
- byteBuddyInvokerConstructorCache.put(fnClass, constructor);
- }
- return constructor;
+ return byteBuddyInvokerConstructorCache.computeIfAbsent(
+ fnClass,
+ clazz -> {
+ Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
+ try {
+ return invokerClass.getConstructor(clazz);
+ } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
/** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 748c1b9fdb6..632c30a3d99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -33,10 +33,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
@@ -114,7 +114,13 @@ public class DoFnSignatures {
private DoFnSignatures() {}
- private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>();
+ /**
+ * Note that special care must be taken to enumerate this object as concurrent hash maps are <a
+ * href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly>weakly
+ * consistent</a>.
+ */
+ private static final Map<Class<? extends DoFn<?, ?>>, DoFnSignature> signatureCache =
+ new ConcurrentHashMap<>();
private static final ImmutableList<Class<? extends Parameter>>
ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
@@ -290,8 +296,8 @@ public class DoFnSignatures {
}
/** @return the {@link DoFnSignature} for the given {@link DoFn} subclass. */
- public static synchronized <FnT extends DoFn<?, ?>> DoFnSignature getSignature(Class<FnT> fn) {
- return signatureCache.computeIfAbsent(fn, k -> parseSignature(fn));
+ public static <FnT extends DoFn<?, ?>> DoFnSignature getSignature(Class<FnT> fn) {
+ return signatureCache.computeIfAbsent(fn, DoFnSignatures::parseSignature);
}
/**
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 4e55e9636f9..211d89d9b74 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -41,31 +41,47 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
import com.google.common.testing.EqualsTester;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
+import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.description.modifier.Visibility;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.description.type.TypeDescription;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.dynamic.DynamicType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.commons.lang3.SystemUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
@@ -1246,4 +1262,111 @@ public class ProxyInvocationHandlerTest {
handler.as(BaseOptions.class);
assertEquals("foo", handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
}
+
+ private DynamicType.Unloaded<? extends PipelineOptions> spinNewInterface(int methodNumber) {
+ return new ByteBuddy()
+ .makeInterface(PipelineOptions.class)
+ .defineMethod("getDynamicMethod" + methodNumber, String.class, Visibility.PUBLIC)
+ .withoutCode()
+ .defineMethod("setDynamicMethod" + methodNumber, Void.TYPE, Visibility.PUBLIC)
+ .withParameters(String.class)
+ .withoutCode()
+ .make();
+ }
+
+ private Map<Integer, Class<? extends PipelineOptions>> loadAllInterfaces(
+ int numInterfaces, ClassLoader classLoader) {
+ List<DynamicType.Unloaded<? extends PipelineOptions>> dynamicInterfaces =
+ IntStream.range(0, numInterfaces)
+ .mapToObj(this::spinNewInterface)
+ .collect(Collectors.toList());
+
+ DynamicType.Loaded<Object> root =
+ new ByteBuddy().subclass(Object.class).make().include(dynamicInterfaces).load(classLoader);
+
+ Map<TypeDescription, Class<?>> loadedInterfaces = root.getLoadedAuxiliaryTypes();
+ Map<Integer, Class<? extends PipelineOptions>> result = Maps.newHashMap();
+
+ IntStream.range(0, numInterfaces)
+ .forEach(
+ i -> {
+ DynamicType.Unloaded<? extends PipelineOptions> iface = dynamicInterfaces.get(i);
+ Class<?> clazz = loadedInterfaces.get(iface.getTypeDescription());
+ result.put(i, (Class<? extends PipelineOptions>) clazz);
+ });
+
+ return result;
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int numInterfaces = 100;
+ int numWorkers = 10;
+ int numReaders = 10;
+ int step = numInterfaces / numWorkers;
+
+ ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.newHashMap());
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Map<Integer, Class<? extends PipelineOptions>> ifaces = loadAllInterfaces(numInterfaces, cl);
+ CountDownLatch startWaiter = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(numWorkers);
+
+ ExecutorService executor = Executors.newFixedThreadPool(numWorkers + numReaders);
+ List<Future<?>> futs = Lists.newArrayList();
+
+ // launch `numWorkers` concurrent "writers" that will try to cast the proxy handler to various
+ // interfaces and set a value on them.
+ for (int start = 0; start < numInterfaces; start += step) {
+ final int s = start;
+ final int end = start + step;
+ Callable<Void> worker =
+ () -> {
+ startWaiter.await();
+ for (int i = s; i < end; i++) {
+ Class<? extends PipelineOptions> iface = ifaces.get(i);
+ Method setter = iface.getDeclaredMethod("setDynamicMethod" + i, String.class);
+ PipelineOptions opt1 = handler.as(iface);
+ setter.invoke(opt1, "test-" + i);
+ }
+ done.countDown();
+ return null;
+ };
+ futs.add(executor.submit(worker));
+ }
+
+ // launch concurrent readers that call `toString` and serializes the options, making sure they
+ // don't fail.
+ for (int i = 0; i < 10; i++) {
+ Callable<Void> worker =
+ () -> {
+ while (true) {
+ if (done.getCount() == 0) {
+ return null;
+ }
+ assertNotNull(handler.toString());
+ PipelineOptionsFactory.MAPPER.writeValue(
+ ByteStreams.nullOutputStream(), handler.as(PipelineOptions.class));
+ }
+ };
+ futs.add(executor.submit(worker));
+ }
+
+ // wait for everything to finish
+ startWaiter.countDown();
+ for (Future<?> fut : futs) {
+ fut.get();
+ }
+
+ // ensure that the serialized json contains every value we set.
+ TokenBuffer tokenBuffer = new TokenBuffer(PipelineOptionsFactory.MAPPER, false);
+ PipelineOptionsFactory.MAPPER.writeValue(tokenBuffer, handler.as(PipelineOptions.class));
+
+ JsonNode tree = PipelineOptionsFactory.MAPPER.readTree(tokenBuffer.asParser());
+ JsonNode optionsNode = tree.get("options");
+ for (int i = 0; i < numInterfaces; i++) {
+ JsonNode methodNode = optionsNode.get("dynamicMethod" + i);
+ assertNotNull(methodNode);
+ assertEquals("test-" + i, methodNode.asText());
+ }
+ }
}