You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/09 00:10:14 UTC
[beam] branch master updated: [BEAM-13015] Start integrating a process wide cache. (#16130)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 774008d [BEAM-13015] Start integrating a process wide cache. (#16130)
774008d is described below
commit 774008de21090c635dc23c58b2f7d9d4aaa40cbf
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Wed Dec 8 16:06:46 2021 -0800
[BEAM-13015] Start integrating a process wide cache. (#16130)
This initial implementation limits the process wide cache to 100mb using JAMM to measure object sizes and stores the bundle descriptors within it.
It also provides a bunch of utility operations on caches to support sub-cache views over cache instances allowing one to locally scope the cache.
---
.../environment/EmbeddedEnvironmentFactory.java | 4 +-
.../fnexecution/control/RemoteExecutionTest.java | 4 +-
.../apache/beam/sdk/options/SdkHarnessOptions.java | 72 ++++++
sdks/java/harness/build.gradle | 11 +
.../beam/fn/harness/ProcessBundleBenchmark.java | 7 +-
.../java/org/apache/beam/fn/harness/Cache.java | 65 ++++++
.../java/org/apache/beam/fn/harness/Caches.java | 259 +++++++++++++++++++++
.../java/org/apache/beam/fn/harness/FnHarness.java | 48 ++--
.../fn/harness/control/ProcessBundleHandler.java | 7 +-
.../org/apache/beam/fn/harness/CachesTest.java | 152 ++++++++++++
.../harness/control/ProcessBundleHandlerTest.java | 40 ++--
11 files changed, 625 insertions(+), 44 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index ad40267..1e09d99 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -106,7 +107,8 @@ public class EmbeddedEnvironmentFactory implements EnvironmentFactory {
controlServer.getApiServiceDescriptor(),
null,
InProcessManagedChannelFactory.create(),
- OutboundObserverFactory.clientDirect());
+ OutboundObserverFactory.clientDirect(),
+ Caches.fromOptions(options));
} catch (NoClassDefFoundError e) {
// TODO: https://issues.apache.org/jira/browse/BEAM-4384 load the FnHarness in a
// Restricted classpath that we control for any user.
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 62f78f2..3cac2ce 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
@@ -220,7 +221,8 @@ public class RemoteExecutionTest implements Serializable {
controlServer.getApiServiceDescriptor(),
null,
InProcessManagedChannelFactory.create(),
- OutboundObserverFactory.clientDirect());
+ OutboundObserverFactory.clientDirect(),
+ Caches.eternal());
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
index 5f15081..4578873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.checkerframework.checker.index.qual.NonNegative;
/** Options that are used to control configuration of the SDK harness. */
@Experimental(Kind.PORTABILITY)
@@ -97,6 +99,76 @@ public interface SdkHarnessOptions extends PipelineOptions {
void setGroupingTableMaxSizeMb(int value);
/**
+ * Size (in MB) for the process wide cache within the SDK harness. The cache is responsible for
+ * storing all values which are cached within a bundle and across bundles such as side inputs and
+ * user state.
+ *
+ * <p>CAUTION: If set too large, SDK harness instances may run into OOM conditions more easily.
+ */
+ @Description(
+ "The size (in MB) for the process wide cache within the SDK harness. The cache is responsible for "
+ + "storing all values which are cached within a bundle and across bundles such as side inputs "
+ + "and user state. CAUTION: If set too large, SDK harness instances may run into OOM conditions more easily.")
+ @Default.InstanceFactory(DefaultMaxCacheMemoryUsageMbFactory.class)
+ @NonNegative
+ int getMaxCacheMemoryUsageMb();
+
+ void setMaxCacheMemoryUsageMb(@NonNegative int value);
+
+ /**
+ * An instance of this class will be used to specify the maximum amount of memory to allocate to a
+ * cache within an SDK harness instance.
+ *
+ * <p>This parameter will only be used if an explicit value was not specified for {@link
+ * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+ */
+ @Description(
+ "An instance of this class will be used to specify the maximum amount of memory to allocate to a "
+ + " process wide cache within an SDK harness instance. This parameter will only be used if an explicit value was not specified for --maxCacheMemoryUsageMb.")
+ @Default.Class(DefaultMaxCacheMemoryUsageMb.class)
+ Class<? extends MaxCacheMemoryUsageMb> getMaxCacheMemoryUsageMbClass();
+
+ void setMaxCacheMemoryUsageMbClass(Class<? extends MaxCacheMemoryUsageMb> kls);
+
+ /**
+ * A {@link DefaultValueFactory} which specifies the maximum amount of memory to allocate to the
+ * process wide cache within an SDK harness instance.
+ */
+ class DefaultMaxCacheMemoryUsageMbFactory implements DefaultValueFactory<@NonNegative Integer> {
+
+ @Override
+ public @NonNegative Integer create(PipelineOptions options) {
+ SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class);
+ return (Integer)
+ checkNotNull(
+ InstanceBuilder.ofType(MaxCacheMemoryUsageMb.class)
+ .fromClass(sdkHarnessOptions.getMaxCacheMemoryUsageMbClass())
+ .build()
+ .getMaxCacheMemoryUsage(options));
+ }
+ }
+
+ /** Specifies the maximum amount of memory to use within the current SDK harness instance. */
+ interface MaxCacheMemoryUsageMb {
+ @NonNegative
+ int getMaxCacheMemoryUsage(PipelineOptions options);
+ }
+
+ /**
+ * The default implementation which detects how much memory to use for a process wide cache.
+ *
+ * <p>TODO(BEAM-13015): Detect the amount of memory to use instead of hard-coding to 100.
+ */
+ class DefaultMaxCacheMemoryUsageMb implements MaxCacheMemoryUsageMb {
+ @Override
+ public int getMaxCacheMemoryUsage(PipelineOptions options) {
+ // TODO(BEAM-13015): Detect environment type and produce a value based upon the maximum amount
+ // of memory available.
+ return 100;
+ }
+ }
+
+ /**
* Defines a log level override for a specific class, package, or name.
*
* <p>The SDK harness supports a logging hierarchy based off of names that are "." separated. It
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 30c6055..c94f992 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -68,6 +68,17 @@ dependencies {
compile library.java.joda_time
compile library.java.slf4j_api
compile library.java.vendored_grpc_1_36_0
+
+ compile 'org.cache2k:cache2k-api:2.4.1.Final'
+ runtime 'org.cache2k:cache2k-core:2.4.1.Final'
+ // cache2k-api:2.4.1.Final has a provided dependency on kotlin-annotations-jvm
+ // providing this dependency is necessary to pass checkerframework validation
+ provided 'org.jetbrains.kotlin:kotlin-annotations-jvm:1.4.10'
+
+ // Swap to use the officially published version of 0.4.x once available
+ // instead of relying on a community published copy. See
+ // https://github.com/jbellis/jamm/issues/44 for additional details.
+ compile 'io.github.stephankoelle:jamm:0.4.1'
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
index 25dcded..e44a704 100644
--- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -131,6 +132,7 @@ public class ProcessBundleBenchmark {
clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
// Create the SDK harness, and wait until it connects
sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);
sdkHarnessExecutorFuture =
@@ -139,13 +141,14 @@ public class ProcessBundleBenchmark {
try {
FnHarness.main(
WORKER_ID,
- PipelineOptionsFactory.create(),
+ pipelineOptions,
Collections.emptySet(), // Runner capabilities.
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
null,
ManagedChannelFactory.createDefault(),
- OutboundObserverFactory.clientDirect());
+ OutboundObserverFactory.clientDirect(),
+ Caches.fromOptions(pipelineOptions));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java
new file mode 100644
index 0000000..c31578d
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A cache allows for the storage and retrieval of values which are associated with keys.
+ *
+ * <p>The cache allows for concurrent access and modification to its content and automatically
+ * controls the amount of entries in the cache to stay within configured resource limits.
+ */
+@Experimental(Kind.PORTABILITY)
+@ThreadSafe
+public interface Cache<K, V> {
+ /** Looks up the specified key returning {@code null} if the value is not within the cache. */
+ @Nullable
+ V peek(K key);
+
+ /**
+ * Looks up the specified key and returns the associated value.
+ *
+ * <p>If the key is not present in the cache, the specified function will be used to load and
+ * populate the cache.
+ */
+ V computeIfAbsent(K key, Function<K, V> loadingFunction);
+
+ /**
+ * Inserts a new value associated with the given key or updates an existing association of the
+ * same key with the new value.
+ */
+ void put(K key, V value);
+
+ /** Removes the mapping for a key from the cache if it is present. */
+ void remove(K key);
+
+ /** Clears all keys and values in the cache. */
+ void clear();
+
+ /**
+ * A view of all keys in the cache. The view is guaranteed to contain all keys present in the
+ * cache at the time of calling the method, and may or may not reflect concurrent inserts or
+ * removals.
+ */
+ Iterable<K> keys();
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
new file mode 100644
index 0000000..5eb26e2
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+ /** A cache that never stores any values. */
+ public static <K, V> Cache<K, V> noop() {
+ // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+ // preventing deadlock from occurring when a loading function mutates the underlying cache
+ org.cache2k.Cache<CompositeKey, Object> cache =
+ Cache2kBuilder.of(CompositeKey.class, Object.class)
+ .entryCapacity(1)
+ .storeByReference(true)
+ .expireAfterWrite(0, TimeUnit.NANOSECONDS)
+ .sharpExpiry(true)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+
+ return (Cache<K, V>) forCache(cache);
+ }
+
+ /** A cache that never evicts any values. */
+ public static <K, V> Cache<K, V> eternal() {
+ // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+ // preventing deadlock from occurring when a loading function mutates the underlying cache
+ org.cache2k.Cache<CompositeKey, Object> cache =
+ Cache2kBuilder.of(CompositeKey.class, Object.class)
+ .entryCapacity(Long.MAX_VALUE)
+ .storeByReference(true)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+ return (Cache<K, V>) forCache(cache);
+ }
+
+ /**
+ * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+ * parameters within {@link SdkHarnessOptions}.
+ */
+ public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+ // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+ // preventing deadlock from occurring when a loading function mutates the underlying cache
+ org.cache2k.Cache<CompositeKey, Object> cache =
+ Cache2kBuilder.of(CompositeKey.class, Object.class)
+ .maximumWeight(
+ options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+ .weigher(
+ new Weigher<CompositeKey, Object>() {
+ private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+ @Override
+ public int weigh(CompositeKey key, Object value) {
+ long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+ return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+ }
+ })
+ .storeByReference(true)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+
+ return (Cache<K, V>) forCache(cache);
+ }
+
+ /**
+ * Returns a view of a cache that operates on keys with a specified key prefix.
+ *
+ * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+ * specified prefixes.
+ *
+ * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+ * operate over keys with the specified prefixes.
+ */
+ public static <K, V> Cache<K, V> subCache(
+ Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+ if (cache instanceof SubCache) {
+ return new SubCache<>(
+ ((SubCache<?, ?>) cache).cache,
+ ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "An unsupported type of cache was passed in. Received %s.",
+ cache == null ? "null" : cache.getClass()));
+ }
+
+ private static Cache<Object, Object> forCache(org.cache2k.Cache<CompositeKey, Object> cache) {
+ return new SubCache<>(cache, CompositeKeyPrefix.ROOT);
+ }
+
+ /**
+ * A view of a cache that operates on keys with a specified key prefix.
+ *
+ * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+ * specified prefixes.
+ *
+ * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+ * operate over keys with the specified prefixes.
+ */
+ private static class SubCache<K, V> implements Cache<K, V> {
+ private final org.cache2k.Cache<CompositeKey, Object> cache;
+ private final CompositeKeyPrefix keyPrefix;
+
+ SubCache(org.cache2k.Cache<CompositeKey, Object> cache, CompositeKeyPrefix keyPrefix) {
+ this.cache = cache;
+ this.keyPrefix = keyPrefix;
+ }
+
+ @Override
+ public V peek(K key) {
+ return (V) cache.peek(keyPrefix.valueKey(key));
+ }
+
+ @Override
+ public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+ return (V)
+ cache.computeIfAbsent(keyPrefix.valueKey(key), o -> loadingFunction.apply((K) o.key));
+ }
+
+ @Override
+ public void put(K key, V value) {
+ cache.put(keyPrefix.valueKey(key), value);
+ }
+
+ @Override
+ public void clear() {
+ for (CompositeKey key : Sets.filter(cache.keys(), keyPrefix::isProperPrefixOf)) {
+ cache.remove(key);
+ }
+ }
+
+ @Override
+ public Iterable<K> keys() {
+ return Iterables.transform(
+ Sets.filter(cache.keys(), keyPrefix::isEquivalentNamespace),
+ input -> (K) Preconditions.checkNotNull(input.key));
+ }
+
+ @Override
+ public void remove(K key) {
+ cache.remove(keyPrefix.valueKey(key));
+ }
+ }
+
+ /** A key prefix used to generate keys that are stored within a sub-cache. */
+ static class CompositeKeyPrefix {
+ public static final CompositeKeyPrefix ROOT = new CompositeKeyPrefix(new Object[0]);
+
+ private final Object[] namespace;
+
+ private CompositeKeyPrefix(Object[] namespace) {
+ this.namespace = namespace;
+ }
+
+ CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) {
+ Object[] subKey = new Object[namespace.length + 1 + additionalSuffixes.length];
+ System.arraycopy(namespace, 0, subKey, 0, namespace.length);
+ subKey[namespace.length] = suffix;
+ System.arraycopy(
+ additionalSuffixes, 0, subKey, namespace.length + 1, additionalSuffixes.length);
+ return new CompositeKeyPrefix(subKey);
+ }
+
+ <K> CompositeKey valueKey(K k) {
+ return new CompositeKey(namespace, k);
+ }
+
+ boolean isProperPrefixOf(CompositeKey otherKey) {
+ if (namespace.length > otherKey.namespace.length) {
+ return false;
+ }
+ // Do this in reverse order since the suffix is the part most likely to differ first
+ for (int i = namespace.length - 1; i >= 0; --i) {
+ if (!Objects.equals(namespace[i], otherKey.namespace[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ boolean isEquivalentNamespace(CompositeKey otherKey) {
+ if (namespace.length != otherKey.namespace.length) {
+ return false;
+ }
+ // Do this in reverse order since the suffix is the part most likely to differ first
+ for (int i = namespace.length - 1; i >= 0; --i) {
+ if (!Objects.equals(namespace[i], otherKey.namespace[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /** A tuple of key parts used to represent a key within a cache. */
+ @VisibleForTesting
+ static class CompositeKey {
+ private final Object[] namespace;
+ private final Object key;
+
+ private CompositeKey(Object[] namespace, Object key) {
+ this.namespace = namespace;
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return "CompositeKey{" + "namespace=" + Arrays.toString(namespace) + ", key=" + key + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CompositeKey)) {
+ return false;
+ }
+ CompositeKey that = (CompositeKey) o;
+ return Arrays.equals(namespace, that.namespace) && Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(namespace);
+ }
+ }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index a9636ff..195347a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.fn.harness;
-import java.time.Duration;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
@@ -38,6 +37,7 @@ import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
@@ -59,9 +59,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
@@ -180,6 +177,7 @@ public class FnHarness {
}
OutboundObserverFactory outboundObserverFactory =
HarnessStreamObserverFactories.fromOptions(options);
+
main(
id,
options,
@@ -188,7 +186,8 @@ public class FnHarness {
controlApiServiceDescriptor,
statusApiServiceDescriptor,
channelFactory,
- outboundObserverFactory);
+ outboundObserverFactory,
+ Caches.fromOptions(options));
}
/**
@@ -203,6 +202,7 @@ public class FnHarness {
* @param statusApiServiceDescriptor
* @param channelFactory
* @param outboundObserverFactory
+ * @param processWideCache
* @throws Exception
*/
public static void main(
@@ -213,7 +213,8 @@ public class FnHarness {
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
ManagedChannelFactory channelFactory,
- OutboundObserverFactory outboundObserverFactory)
+ OutboundObserverFactory outboundObserverFactory,
+ Cache<?, ?> processWideCache)
throws Exception {
channelFactory =
channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
@@ -226,7 +227,6 @@ public class FnHarness {
try (BeamFnLoggingClient logging =
new BeamFnLoggingClient(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
-
LOG.info("Fn Harness started");
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
@@ -250,20 +250,24 @@ public class FnHarness {
FinalizeBundleHandler finalizeBundleHandler =
new FinalizeBundleHandler(options.as(GcsOptions.class).getExecutorService());
- LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors =
- CacheBuilder.newBuilder()
- .maximumSize(1000)
- .expireAfterAccess(Duration.ofMinutes(10))
- .build(
- new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>() {
- @Override
- public BeamFnApi.ProcessBundleDescriptor load(String id) {
- return blockingControlStub.getProcessBundleDescriptor(
- BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
- .setProcessBundleDescriptorId(id)
- .build());
- }
- });
+ Function<String, BeamFnApi.ProcessBundleDescriptor> getProcessBundleDescriptor =
+ new Function<String, ProcessBundleDescriptor>() {
+ private static final String PROCESS_BUNDLE_DESCRIPTORS = "ProcessBundleDescriptors";
+ private final Cache<String, BeamFnApi.ProcessBundleDescriptor> cache =
+ Caches.subCache(processWideCache, PROCESS_BUNDLE_DESCRIPTORS);
+
+ @Override
+ public BeamFnApi.ProcessBundleDescriptor apply(String id) {
+ return cache.computeIfAbsent(id, this::loadDescriptor);
+ }
+
+ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
+ return blockingControlStub.getProcessBundleDescriptor(
+ BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
+ .setProcessBundleDescriptorId(id)
+ .build());
+ }
+ };
MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
@@ -271,7 +275,7 @@ public class FnHarness {
new ProcessBundleHandler(
options,
runnerCapabilites,
- processBundleDescriptors::getUnchecked,
+ getProcessBundleDescriptor,
beamFnDataMultiplexer,
beamFnStateGrpcClientCache,
finalizeBundleHandler,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 0605ac3..1f28523 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -163,7 +162,7 @@ public class ProcessBundleHandler {
};
private final PipelineOptions options;
- private final Function<String, Message> fnApiRegistry;
+ private final Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry;
private final BeamFnDataClient beamFnDataClient;
private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
private final LoadingCache<
@@ -180,7 +179,7 @@ public class ProcessBundleHandler {
public ProcessBundleHandler(
PipelineOptions options,
Set<String> runnerCapabilities,
- Function<String, Message> fnApiRegistry,
+ Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry,
BeamFnDataClient beamFnDataClient,
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
FinalizeBundleHandler finalizeBundleHandler,
@@ -201,7 +200,7 @@ public class ProcessBundleHandler {
ProcessBundleHandler(
PipelineOptions options,
Set<String> runnerCapabilities,
- Function<String, Message> fnApiRegistry,
+ Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry,
BeamFnDataClient beamFnDataClient,
BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
FinalizeBundleHandler finalizeBundleHandler,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
new file mode 100644
index 0000000..ce101dc
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caches}. */
+@RunWith(JUnit4.class)
+public class CachesTest {
+ @Test
+ public void testNoopCache() {
+ Cache<String, String> cache = Caches.noop();
+ cache.put("key", "value");
+ assertNull(cache.peek("key"));
+ assertEquals("value", cache.computeIfAbsent("key", (unused) -> "value"));
+ assertNull(cache.peek("key"));
+ assertThat(cache.keys(), is(emptyIterable()));
+ }
+
+ @Test
+ public void testEternalCache() {
+ testCache(Caches.eternal());
+ }
+
+ @Test
+ public void testDefaultCache() {
+ testCache(Caches.fromOptions(PipelineOptionsFactory.create()));
+ }
+
+ @Test
+ public void testSubCache() {
+ testCache(Caches.subCache(Caches.eternal(), "prefix"));
+ }
+
+ @Test
+ public void testSiblingSubCaches() {
+ Cache<String, String> parent = Caches.eternal();
+ Cache<String, String> cacheA = Caches.subCache(parent, "prefixA");
+ Cache<String, String> cacheACopy = Caches.subCache(parent, "prefixA");
+ Cache<String, String> cacheB = Caches.subCache(parent, "prefixB");
+
+ // Test values inserted into caches with the same prefix can be found in other instances with
+ // the same prefix but not in the cache with a different prefix
+ cacheA.put("keyA", "valueA");
+ assertEquals("valueA", cacheA.peek("keyA"));
+ assertEquals("valueA", cacheACopy.peek("keyA"));
+ assertNull(cacheB.peek("keyA"));
+
+ // Test clearing a cache with a different prefix doesn't impact keys without the same prefix
+ cacheB.clear();
+ assertEquals("valueA", cacheA.peek("keyA"));
+ assertEquals("valueA", cacheACopy.peek("keyA"));
+
+ // Test clearing a cache with the same prefix impacts other instances
+ cacheACopy.clear();
+ assertNull(cacheA.peek("keyA"));
+ assertNull(cacheACopy.peek("keyA"));
+ }
+
+ @Test
+ public void testNestedSubCaches() {
+ Cache<String, String> parent = Caches.eternal();
+ Cache<String, String> child = Caches.subCache(parent, "child");
+ Cache<String, String> childOfChild = Caches.subCache(child, "childOfChild");
+
+ // Test nested put
+ child.put("keyA", "childA");
+ childOfChild.put("keyA", "childOfChildA");
+ assertEquals("childA", child.peek("keyA"));
+ assertEquals("childOfChildA", childOfChild.peek("keyA"));
+
+ // Test nested computeIfAbsent
+ child.computeIfAbsent("keyB", (unused) -> "childB");
+ childOfChild.computeIfAbsent("keyB", (unused) -> "childOfChildB");
+ assertEquals("childB", child.peek("keyB"));
+ assertEquals("childOfChildB", childOfChild.peek("keyB"));
+
+ // Test removal doesn't impact children
+ child.remove("keyA");
+ assertNull(child.peek("keyA"));
+ assertEquals("childOfChildA", childOfChild.peek("keyA"));
+
+ // Test removal doesn't impact parent
+ childOfChild.remove("keyB");
+ assertEquals("childB", child.peek("keyB"));
+ assertNull(childOfChild.peek("keyB"));
+
+ // Test that clearing the middle cache impacts children but not parent
+ parent.put("keyA", "parentA");
+ parent.put("keyB", "parentB");
+ child.clear();
+ assertThat(child.keys(), is(emptyIterable()));
+ assertThat(childOfChild.keys(), is(emptyIterable()));
+ assertEquals("parentA", parent.peek("keyA"));
+ assertEquals("parentB", parent.peek("keyB"));
+ }
+
+ private void testCache(Cache<String, String> cache) {
+ assertNull(cache.peek("key1"));
+
+ // Test put
+ cache.put("key1", "value1");
+ assertEquals("value1", cache.peek("key1"));
+
+ // Test compute without load
+ assertEquals("value1", cache.computeIfAbsent("key1", (unused) -> "anotherValue"));
+ assertEquals("value1", cache.peek("key1"));
+
+ // Test compute with load
+ assertEquals("value2", cache.computeIfAbsent("key2", (unused) -> "value2"));
+ assertEquals("value2", cache.peek("key2"));
+
+ assertThat(cache.keys(), containsInAnyOrder("key1", "key2"));
+
+ // Test removal
+ cache.remove("key1");
+ assertNull(cache.peek("key1"));
+ assertEquals("value2", cache.peek("key2"));
+ assertThat(cache.keys(), containsInAnyOrder("key2"));
+
+ // Test clear
+ cache.clear();
+ assertNull(cache.peek("key1"));
+ assertNull(cache.peek("key2"));
+ assertThat(cache.keys(), is(emptyIterable()));
+ }
+}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index d609b2f..e8c6678 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -117,7 +117,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Message;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -385,7 +384,8 @@ public class ProcessBundleHandlerTest {
.build())
.putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
List<RunnerApi.PTransform> transformsProcessed = new ArrayList<>();
List<String> orderOfOperations = new ArrayList<>();
@@ -505,7 +505,8 @@ public class ProcessBundleHandlerTest {
.build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
@@ -557,7 +558,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(
@@ -685,7 +687,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(
@@ -724,7 +727,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
FinalizeBundleHandler mockFinalizeBundleHandler = mock(FinalizeBundleHandler.class);
BundleFinalizer.Callback mockCallback = mock(BundleFinalizer.Callback.class);
@@ -780,7 +784,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(
@@ -919,7 +924,8 @@ public class ProcessBundleHandlerTest {
.addComponentCoderIds("window-strategy-coder")
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
@@ -1193,7 +1199,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
Mockito.doAnswer(
(invocation) -> {
@@ -1256,7 +1263,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
Mockito.doAnswer(
(invocation) -> {
@@ -1329,7 +1337,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(
@@ -1375,7 +1384,8 @@ public class ProcessBundleHandlerTest {
.build())
.setStateApiServiceDescriptor(ApiServiceDescriptor.getDefaultInstance())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
CompletableFuture<StateResponse>[] successfulResponse = new CompletableFuture[1];
CompletableFuture<StateResponse>[] unsuccessfulResponse = new CompletableFuture[1];
@@ -1460,7 +1470,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(
@@ -1509,7 +1520,8 @@ public class ProcessBundleHandlerTest {
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
.build())
.build();
- Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+ Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+ ImmutableMap.of("1L", processBundleDescriptor);
ProcessBundleHandler handler =
new ProcessBundleHandler(