You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2021/12/09 00:10:14 UTC

[beam] branch master updated: [BEAM-13015] Start integrating a process wide cache. (#16130)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 774008d  [BEAM-13015] Start integrating a process wide cache. (#16130)
774008d is described below

commit 774008de21090c635dc23c58b2f7d9d4aaa40cbf
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Wed Dec 8 16:06:46 2021 -0800

    [BEAM-13015] Start integrating a process wide cache. (#16130)
    
    This initial implementation limits the process wide cache to 100mb using JAMM to measure object sizes and stores the bundle descriptors within it.
    
    It also provides a bunch of utility operations on caches to support sub-cache views over cache instances allowing one to locally scope the cache.
---
 .../environment/EmbeddedEnvironmentFactory.java    |   4 +-
 .../fnexecution/control/RemoteExecutionTest.java   |   4 +-
 .../apache/beam/sdk/options/SdkHarnessOptions.java |  72 ++++++
 sdks/java/harness/build.gradle                     |  11 +
 .../beam/fn/harness/ProcessBundleBenchmark.java    |   7 +-
 .../java/org/apache/beam/fn/harness/Cache.java     |  65 ++++++
 .../java/org/apache/beam/fn/harness/Caches.java    | 259 +++++++++++++++++++++
 .../java/org/apache/beam/fn/harness/FnHarness.java |  48 ++--
 .../fn/harness/control/ProcessBundleHandler.java   |   7 +-
 .../org/apache/beam/fn/harness/CachesTest.java     | 152 ++++++++++++
 .../harness/control/ProcessBundleHandlerTest.java  |  40 ++--
 11 files changed, 625 insertions(+), 44 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index ad40267..1e09d99 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import org.apache.beam.fn.harness.Caches;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -106,7 +107,8 @@ public class EmbeddedEnvironmentFactory implements EnvironmentFactory {
                     controlServer.getApiServiceDescriptor(),
                     null,
                     InProcessManagedChannelFactory.create(),
-                    OutboundObserverFactory.clientDirect());
+                    OutboundObserverFactory.clientDirect(),
+                    Caches.fromOptions(options));
               } catch (NoClassDefFoundError e) {
                 // TODO: https://issues.apache.org/jira/browse/BEAM-4384 load the FnHarness in a
                 // Restricted classpath that we control for any user.
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 62f78f2..3cac2ce 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import org.apache.beam.fn.harness.Caches;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
@@ -220,7 +221,8 @@ public class RemoteExecutionTest implements Serializable {
                     controlServer.getApiServiceDescriptor(),
                     null,
                     InProcessManagedChannelFactory.create(),
-                    OutboundObserverFactory.clientDirect());
+                    OutboundObserverFactory.clientDirect(),
+                    Caches.eternal());
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
index 5f15081..4578873 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.checkerframework.checker.index.qual.NonNegative;
 
 /** Options that are used to control configuration of the SDK harness. */
 @Experimental(Kind.PORTABILITY)
@@ -97,6 +99,76 @@ public interface SdkHarnessOptions extends PipelineOptions {
   void setGroupingTableMaxSizeMb(int value);
 
   /**
+   * Size (in MB) for the process wide cache within the SDK harness. The cache is responsible for
+   * storing all values which are cached within a bundle and across bundles such as side inputs and
+   * user state.
+   *
+   * <p>CAUTION: If set too large, SDK harness instances may run into OOM conditions more easily.
+   */
+  @Description(
+      "The size (in MB) for the process wide cache within the SDK harness. The cache is responsible for "
+          + "storing all values which are cached within a bundle and across bundles such as side inputs "
+          + "and user state. CAUTION: If set too large, SDK harness instances may run into OOM conditions more easily.")
+  @Default.InstanceFactory(DefaultMaxCacheMemoryUsageMbFactory.class)
+  @NonNegative
+  int getMaxCacheMemoryUsageMb();
+
+  void setMaxCacheMemoryUsageMb(@NonNegative int value);
+
+  /**
+   * An instance of this class will be used to specify the maximum amount of memory to allocate to a
+   * cache within an SDK harness instance.
+   *
+   * <p>This parameter will only be used if an explicit value was not specified for {@link
+   * #getMaxCacheMemoryUsageMb() maxCacheMemoryUsageMb}.
+   */
+  @Description(
+      "An instance of this class will be used to specify the maximum amount of memory to allocate to a "
+          + " process wide cache within an SDK harness instance. This parameter will only be used if an explicit value was not specified for --maxCacheMemoryUsageMb.")
+  @Default.Class(DefaultMaxCacheMemoryUsageMb.class)
+  Class<? extends MaxCacheMemoryUsageMb> getMaxCacheMemoryUsageMbClass();
+
+  void setMaxCacheMemoryUsageMbClass(Class<? extends MaxCacheMemoryUsageMb> kls);
+
+  /**
+   * A {@link DefaultValueFactory} which specifies the maximum amount of memory to allocate to the
+   * process wide cache within an SDK harness instance.
+   */
+  class DefaultMaxCacheMemoryUsageMbFactory implements DefaultValueFactory<@NonNegative Integer> {
+
+    @Override
+    public @NonNegative Integer create(PipelineOptions options) {
+      SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class);
+      return (Integer)
+          checkNotNull(
+              InstanceBuilder.ofType(MaxCacheMemoryUsageMb.class)
+                  .fromClass(sdkHarnessOptions.getMaxCacheMemoryUsageMbClass())
+                  .build()
+                  .getMaxCacheMemoryUsage(options));
+    }
+  }
+
+  /** Specifies the maximum amount of memory to use within the current SDK harness instance. */
+  interface MaxCacheMemoryUsageMb {
+    @NonNegative
+    int getMaxCacheMemoryUsage(PipelineOptions options);
+  }
+
+  /**
+   * The default implementation which detects how much memory to use for a process wide cache.
+   *
+   * <p>TODO(BEAM-13015): Detect the amount of memory to use instead of hard-coding to 100.
+   */
+  class DefaultMaxCacheMemoryUsageMb implements MaxCacheMemoryUsageMb {
+    @Override
+    public int getMaxCacheMemoryUsage(PipelineOptions options) {
+      // TODO(BEAM-13015): Detect environment type and produce a value based upon the maximum amount
+      // of memory available.
+      return 100;
+    }
+  }
+
+  /**
    * Defines a log level override for a specific class, package, or name.
    *
    * <p>The SDK harness supports a logging hierarchy based off of names that are "." separated. It
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 30c6055..c94f992 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -68,6 +68,17 @@ dependencies {
   compile library.java.joda_time
   compile library.java.slf4j_api
   compile library.java.vendored_grpc_1_36_0
+
+  compile 'org.cache2k:cache2k-api:2.4.1.Final'
+  runtime 'org.cache2k:cache2k-core:2.4.1.Final'
+  // cache2k-api:2.4.1.Final has a provided dependency on kotlin-annotations-jvm
+  // providing this dependency is necessary to pass checkerframework validation
+  provided 'org.jetbrains.kotlin:kotlin-annotations-jvm:1.4.10'
+
+  // Swap to use the officially published version of 0.4.x once available
+  // instead of relying on a community published copy. See
+  // https://github.com/jbellis/jamm/issues/44 for additional details.
+  compile 'io.github.stephankoelle:jamm:0.4.1'
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
index 25dcded..e44a704 100644
--- a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/ProcessBundleBenchmark.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.sdk.fn.server.GrpcFnServer;
 import org.apache.beam.sdk.fn.server.ServerFactory;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -131,6 +132,7 @@ public class ProcessBundleBenchmark {
                     clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
                 serverFactory);
 
+        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
         // Create the SDK harness, and wait until it connects
         sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);
         sdkHarnessExecutorFuture =
@@ -139,13 +141,14 @@ public class ProcessBundleBenchmark {
                   try {
                     FnHarness.main(
                         WORKER_ID,
-                        PipelineOptionsFactory.create(),
+                        pipelineOptions,
                         Collections.emptySet(), // Runner capabilities.
                         loggingServer.getApiServiceDescriptor(),
                         controlServer.getApiServiceDescriptor(),
                         null,
                         ManagedChannelFactory.createDefault(),
-                        OutboundObserverFactory.clientDirect());
+                        OutboundObserverFactory.clientDirect(),
+                        Caches.fromOptions(pipelineOptions));
                   } catch (Exception e) {
                     throw new RuntimeException(e);
                   }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java
new file mode 100644
index 0000000..c31578d
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Cache.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A cache allows for the storage and retrieval of values which are associated with keys.
+ *
+ * <p>The cache allows for concurrent access and modification to its content and automatically
+ * controls the amount of entries in the cache to stay within configured resource limits.
+ */
+@Experimental(Kind.PORTABILITY)
+@ThreadSafe
+public interface Cache<K, V> {
+  /** Looks up the specified key returning {@code null} if the value is not within the cache. */
+  @Nullable
+  V peek(K key);
+
+  /**
+   * Looks up the specified key and returns the associated value.
+   *
+   * <p>If the key is not present in the cache, the specified function will be used to load and
+   * populate the cache.
+   */
+  V computeIfAbsent(K key, Function<K, V> loadingFunction);
+
+  /**
+   * Inserts a new value associated with the given key or updates an existing association of the
+   * same key with the new value.
+   */
+  void put(K key, V value);
+
+  /** Removes the mapping for a key from the cache if it is present. */
+  void remove(K key);
+
+  /** Clears all keys and values in the cache. */
+  void clear();
+
+  /**
+   * A view of all keys in the cache. The view is guaranteed to contain all keys present in the
+   * cache at the time of calling the method, and may or may not reflect concurrent inserts or
+   * removals.
+   */
+  Iterable<K> keys();
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
new file mode 100644
index 0000000..5eb26e2
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.cache2k.Cache2kBuilder;
+import org.cache2k.operation.Weigher;
+import org.github.jamm.MemoryMeter;
+
+/** Utility methods used to instantiate and operate over cache instances. */
+@SuppressWarnings("nullness")
+public final class Caches {
+
+  /** A cache that never stores any values. */
+  public static <K, V> Cache<K, V> noop() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<CompositeKey, Object> cache =
+        Cache2kBuilder.of(CompositeKey.class, Object.class)
+            .entryCapacity(1)
+            .storeByReference(true)
+            .expireAfterWrite(0, TimeUnit.NANOSECONDS)
+            .sharpExpiry(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /** A cache that never evicts any values. */
+  public static <K, V> Cache<K, V> eternal() {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<CompositeKey, Object> cache =
+        Cache2kBuilder.of(CompositeKey.class, Object.class)
+            .entryCapacity(Long.MAX_VALUE)
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon
+   * parameters within {@link SdkHarnessOptions}.
+   */
+  public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
+    // We specifically use cache2k since it allows for recursive computeIfAbsent calls
+    // preventing deadlock from occurring when a loading function mutates the underlying cache
+    org.cache2k.Cache<CompositeKey, Object> cache =
+        Cache2kBuilder.of(CompositeKey.class, Object.class)
+            .maximumWeight(
+                options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1024L * 1024L)
+            .weigher(
+                new Weigher<CompositeKey, Object>() {
+                  private final MemoryMeter memoryMeter = MemoryMeter.builder().build();
+
+                  @Override
+                  public int weigh(CompositeKey key, Object value) {
+                    long size = memoryMeter.measureDeep(key) + memoryMeter.measureDeep(value);
+                    return size > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) size;
+                  }
+                })
+            .storeByReference(true)
+            .executor(MoreExecutors.directExecutor())
+            .build();
+
+    return (Cache<K, V>) forCache(cache);
+  }
+
+  /**
+   * Returns a view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  public static <K, V> Cache<K, V> subCache(
+      Cache<?, ?> cache, Object keyPrefix, Object... additionalKeyPrefix) {
+    if (cache instanceof SubCache) {
+      return new SubCache<>(
+          ((SubCache<?, ?>) cache).cache,
+          ((SubCache<?, ?>) cache).keyPrefix.subKey(keyPrefix, additionalKeyPrefix));
+    }
+    throw new IllegalArgumentException(
+        String.format(
+            "An unsupported type of cache was passed in. Received %s.",
+            cache == null ? "null" : cache.getClass()));
+  }
+
+  private static Cache<Object, Object> forCache(org.cache2k.Cache<CompositeKey, Object> cache) {
+    return new SubCache<>(cache, CompositeKeyPrefix.ROOT);
+  }
+
+  /**
+   * A view of a cache that operates on keys with a specified key prefix.
+   *
+   * <p>All lookups, insertions, and removals into the parent {@link Cache} will be prefixed by the
+   * specified prefixes.
+   *
+   * <p>Operations which operate over the entire caches contents such as {@link Cache#clear} only
+   * operate over keys with the specified prefixes.
+   */
+  private static class SubCache<K, V> implements Cache<K, V> {
+    private final org.cache2k.Cache<CompositeKey, Object> cache;
+    private final CompositeKeyPrefix keyPrefix;
+
+    SubCache(org.cache2k.Cache<CompositeKey, Object> cache, CompositeKeyPrefix keyPrefix) {
+      this.cache = cache;
+      this.keyPrefix = keyPrefix;
+    }
+
+    @Override
+    public V peek(K key) {
+      return (V) cache.peek(keyPrefix.valueKey(key));
+    }
+
+    @Override
+    public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
+      return (V)
+          cache.computeIfAbsent(keyPrefix.valueKey(key), o -> loadingFunction.apply((K) o.key));
+    }
+
+    @Override
+    public void put(K key, V value) {
+      cache.put(keyPrefix.valueKey(key), value);
+    }
+
+    @Override
+    public void clear() {
+      for (CompositeKey key : Sets.filter(cache.keys(), keyPrefix::isProperPrefixOf)) {
+        cache.remove(key);
+      }
+    }
+
+    @Override
+    public Iterable<K> keys() {
+      return Iterables.transform(
+          Sets.filter(cache.keys(), keyPrefix::isEquivalentNamespace),
+          input -> (K) Preconditions.checkNotNull(input.key));
+    }
+
+    @Override
+    public void remove(K key) {
+      cache.remove(keyPrefix.valueKey(key));
+    }
+  }
+
+  /** A key prefix used to generate keys that are stored within a sub-cache. */
+  static class CompositeKeyPrefix {
+    public static final CompositeKeyPrefix ROOT = new CompositeKeyPrefix(new Object[0]);
+
+    private final Object[] namespace;
+
+    private CompositeKeyPrefix(Object[] namespace) {
+      this.namespace = namespace;
+    }
+
+    CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) {
+      Object[] subKey = new Object[namespace.length + 1 + additionalSuffixes.length];
+      System.arraycopy(namespace, 0, subKey, 0, namespace.length);
+      subKey[namespace.length] = suffix;
+      System.arraycopy(
+          additionalSuffixes, 0, subKey, namespace.length + 1, additionalSuffixes.length);
+      return new CompositeKeyPrefix(subKey);
+    }
+
+    <K> CompositeKey valueKey(K k) {
+      return new CompositeKey(namespace, k);
+    }
+
+    boolean isProperPrefixOf(CompositeKey otherKey) {
+      if (namespace.length > otherKey.namespace.length) {
+        return false;
+      }
+      // Do this in reverse order since the suffix is the part most likely to differ first
+      for (int i = namespace.length - 1; i >= 0; --i) {
+        if (!Objects.equals(namespace[i], otherKey.namespace[i])) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    boolean isEquivalentNamespace(CompositeKey otherKey) {
+      if (namespace.length != otherKey.namespace.length) {
+        return false;
+      }
+      // Do this in reverse order since the suffix is the part most likely to differ first
+      for (int i = namespace.length - 1; i >= 0; --i) {
+        if (!Objects.equals(namespace[i], otherKey.namespace[i])) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /** A tuple of key parts used to represent a key within a cache. */
+  @VisibleForTesting
+  static class CompositeKey {
+    private final Object[] namespace;
+    private final Object key;
+
+    private CompositeKey(Object[] namespace, Object key) {
+      this.namespace = namespace;
+      this.key = key;
+    }
+
+    @Override
+    public String toString() {
+      return "CompositeKey{" + "namespace=" + Arrays.toString(namespace) + ", key=" + key + "}";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof CompositeKey)) {
+        return false;
+      }
+      CompositeKey that = (CompositeKey) o;
+      return Arrays.equals(namespace, that.namespace) && Objects.equals(key, that.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(namespace);
+    }
+  }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index a9636ff..195347a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.fn.harness;
 
-import java.time.Duration;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
@@ -38,6 +37,7 @@ import org.apache.beam.fn.harness.status.BeamFnStatusClient;
 import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
@@ -59,9 +59,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
 import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
@@ -180,6 +177,7 @@ public class FnHarness {
     }
     OutboundObserverFactory outboundObserverFactory =
         HarnessStreamObserverFactories.fromOptions(options);
+
     main(
         id,
         options,
@@ -188,7 +186,8 @@ public class FnHarness {
         controlApiServiceDescriptor,
         statusApiServiceDescriptor,
         channelFactory,
-        outboundObserverFactory);
+        outboundObserverFactory,
+        Caches.fromOptions(options));
   }
 
   /**
@@ -203,6 +202,7 @@ public class FnHarness {
    * @param statusApiServiceDescriptor
    * @param channelFactory
    * @param outboundObserverFactory
+   * @param processWideCache
    * @throws Exception
    */
   public static void main(
@@ -213,7 +213,8 @@ public class FnHarness {
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
       Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
       ManagedChannelFactory channelFactory,
-      OutboundObserverFactory outboundObserverFactory)
+      OutboundObserverFactory outboundObserverFactory,
+      Cache<?, ?> processWideCache)
       throws Exception {
     channelFactory =
         channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
@@ -226,7 +227,6 @@ public class FnHarness {
     try (BeamFnLoggingClient logging =
         new BeamFnLoggingClient(
             options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
-
       LOG.info("Fn Harness started");
       // Register standard file systems.
       FileSystems.setDefaultPipelineOptions(options);
@@ -250,20 +250,24 @@ public class FnHarness {
       FinalizeBundleHandler finalizeBundleHandler =
           new FinalizeBundleHandler(options.as(GcsOptions.class).getExecutorService());
 
-      LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors =
-          CacheBuilder.newBuilder()
-              .maximumSize(1000)
-              .expireAfterAccess(Duration.ofMinutes(10))
-              .build(
-                  new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>() {
-                    @Override
-                    public BeamFnApi.ProcessBundleDescriptor load(String id) {
-                      return blockingControlStub.getProcessBundleDescriptor(
-                          BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
-                              .setProcessBundleDescriptorId(id)
-                              .build());
-                    }
-                  });
+      Function<String, BeamFnApi.ProcessBundleDescriptor> getProcessBundleDescriptor =
+          new Function<String, ProcessBundleDescriptor>() {
+            private static final String PROCESS_BUNDLE_DESCRIPTORS = "ProcessBundleDescriptors";
+            private final Cache<String, BeamFnApi.ProcessBundleDescriptor> cache =
+                Caches.subCache(processWideCache, PROCESS_BUNDLE_DESCRIPTORS);
+
+            @Override
+            public BeamFnApi.ProcessBundleDescriptor apply(String id) {
+              return cache.computeIfAbsent(id, this::loadDescriptor);
+            }
+
+            private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
+              return blockingControlStub.getProcessBundleDescriptor(
+                  BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder()
+                      .setProcessBundleDescriptorId(id)
+                      .build());
+            }
+          };
 
       MetricsEnvironment.setProcessWideContainer(MetricsContainerImpl.createProcessWideContainer());
 
@@ -271,7 +275,7 @@ public class FnHarness {
           new ProcessBundleHandler(
               options,
               runnerCapabilites,
-              processBundleDescriptors::getUnchecked,
+              getProcessBundleDescriptor,
               beamFnDataMultiplexer,
               beamFnStateGrpcClientCache,
               finalizeBundleHandler,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 0605ac3..1f28523 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Message;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.TextFormat;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -163,7 +162,7 @@ public class ProcessBundleHandler {
           };
 
   private final PipelineOptions options;
-  private final Function<String, Message> fnApiRegistry;
+  private final Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry;
   private final BeamFnDataClient beamFnDataClient;
   private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
   private final LoadingCache<
@@ -180,7 +179,7 @@ public class ProcessBundleHandler {
   public ProcessBundleHandler(
       PipelineOptions options,
       Set<String> runnerCapabilities,
-      Function<String, Message> fnApiRegistry,
+      Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry,
       BeamFnDataClient beamFnDataClient,
       BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
       FinalizeBundleHandler finalizeBundleHandler,
@@ -201,7 +200,7 @@ public class ProcessBundleHandler {
   ProcessBundleHandler(
       PipelineOptions options,
       Set<String> runnerCapabilities,
-      Function<String, Message> fnApiRegistry,
+      Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry,
       BeamFnDataClient beamFnDataClient,
       BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
       FinalizeBundleHandler finalizeBundleHandler,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
new file mode 100644
index 0000000..ce101dc
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caches}. */
+@RunWith(JUnit4.class)
+public class CachesTest {
+  @Test
+  public void testNoopCache() {
+    Cache<String, String> cache = Caches.noop();
+    cache.put("key", "value");
+    assertNull(cache.peek("key"));
+    assertEquals("value", cache.computeIfAbsent("key", (unused) -> "value"));
+    assertNull(cache.peek("key"));
+    assertThat(cache.keys(), is(emptyIterable()));
+  }
+
+  @Test
+  public void testEternalCache() {
+    testCache(Caches.eternal());
+  }
+
+  @Test
+  public void testDefaultCache() {
+    testCache(Caches.fromOptions(PipelineOptionsFactory.create()));
+  }
+
+  @Test
+  public void testSubCache() {
+    testCache(Caches.subCache(Caches.eternal(), "prefix"));
+  }
+
+  @Test
+  public void testSiblingSubCaches() {
+    Cache<String, String> parent = Caches.eternal();
+    Cache<String, String> cacheA = Caches.subCache(parent, "prefixA");
+    Cache<String, String> cacheACopy = Caches.subCache(parent, "prefixA");
+    Cache<String, String> cacheB = Caches.subCache(parent, "prefixB");
+
+    // Test values inserted into caches with the same prefix can be found in other instances with
+    // the same prefix but not in the cache with a different prefix
+    cacheA.put("keyA", "valueA");
+    assertEquals("valueA", cacheA.peek("keyA"));
+    assertEquals("valueA", cacheACopy.peek("keyA"));
+    assertNull(cacheB.peek("keyA"));
+
+    // Test clearing a cache with a different prefix doesn't impact keys without the same prefix
+    cacheB.clear();
+    assertEquals("valueA", cacheA.peek("keyA"));
+    assertEquals("valueA", cacheACopy.peek("keyA"));
+
+    // Test clearing a cache with the same prefix impacts other instances
+    cacheACopy.clear();
+    assertNull(cacheA.peek("keyA"));
+    assertNull(cacheACopy.peek("keyA"));
+  }
+
+  @Test
+  public void testNestedSubCaches() {
+    Cache<String, String> parent = Caches.eternal();
+    Cache<String, String> child = Caches.subCache(parent, "child");
+    Cache<String, String> childOfChild = Caches.subCache(child, "childOfChild");
+
+    // Test nested put
+    child.put("keyA", "childA");
+    childOfChild.put("keyA", "childOfChildA");
+    assertEquals("childA", child.peek("keyA"));
+    assertEquals("childOfChildA", childOfChild.peek("keyA"));
+
+    // Test nested computeIfAbsent
+    child.computeIfAbsent("keyB", (unused) -> "childB");
+    childOfChild.computeIfAbsent("keyB", (unused) -> "childOfChildB");
+    assertEquals("childB", child.peek("keyB"));
+    assertEquals("childOfChildB", childOfChild.peek("keyB"));
+
+    // Test removal doesn't impact children
+    child.remove("keyA");
+    assertNull(child.peek("keyA"));
+    assertEquals("childOfChildA", childOfChild.peek("keyA"));
+
+    // Test removal doesn't impact parent
+    childOfChild.remove("keyB");
+    assertEquals("childB", child.peek("keyB"));
+    assertNull(childOfChild.peek("keyB"));
+
+    // Test that clearing the middle cache impacts children but not parent
+    parent.put("keyA", "parentA");
+    parent.put("keyB", "parentB");
+    child.clear();
+    assertThat(child.keys(), is(emptyIterable()));
+    assertThat(childOfChild.keys(), is(emptyIterable()));
+    assertEquals("parentA", parent.peek("keyA"));
+    assertEquals("parentB", parent.peek("keyB"));
+  }
+
+  private void testCache(Cache<String, String> cache) {
+    assertNull(cache.peek("key1"));
+
+    // Test put
+    cache.put("key1", "value1");
+    assertEquals("value1", cache.peek("key1"));
+
+    // Test compute without load
+    assertEquals("value1", cache.computeIfAbsent("key1", (unused) -> "anotherValue"));
+    assertEquals("value1", cache.peek("key1"));
+
+    // Test compute with load
+    assertEquals("value2", cache.computeIfAbsent("key2", (unused) -> "value2"));
+    assertEquals("value2", cache.peek("key2"));
+
+    assertThat(cache.keys(), containsInAnyOrder("key1", "key2"));
+
+    // Test removal
+    cache.remove("key1");
+    assertNull(cache.peek("key1"));
+    assertEquals("value2", cache.peek("key2"));
+    assertThat(cache.keys(), containsInAnyOrder("key2"));
+
+    // Test clear
+    cache.clear();
+    assertNull(cache.peek("key1"));
+    assertNull(cache.peek("key2"));
+    assertThat(cache.keys(), is(emptyIterable()));
+  }
+}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index d609b2f..e8c6678 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -117,7 +117,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Message;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -385,7 +384,8 @@ public class ProcessBundleHandlerTest {
                     .build())
             .putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     List<RunnerApi.PTransform> transformsProcessed = new ArrayList<>();
     List<String> orderOfOperations = new ArrayList<>();
@@ -505,7 +505,8 @@ public class ProcessBundleHandlerTest {
                             .build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
         Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
@@ -557,7 +558,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(
@@ -685,7 +687,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(
@@ -724,7 +727,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
     FinalizeBundleHandler mockFinalizeBundleHandler = mock(FinalizeBundleHandler.class);
     BundleFinalizer.Callback mockCallback = mock(BundleFinalizer.Callback.class);
 
@@ -780,7 +784,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(
@@ -919,7 +924,8 @@ public class ProcessBundleHandlerTest {
                     .addComponentCoderIds("window-strategy-coder")
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap =
         Maps.newHashMap(REGISTERED_RUNNER_FACTORIES);
@@ -1193,7 +1199,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     Mockito.doAnswer(
             (invocation) -> {
@@ -1256,7 +1263,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     Mockito.doAnswer(
             (invocation) -> {
@@ -1329,7 +1337,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(
@@ -1375,7 +1384,8 @@ public class ProcessBundleHandlerTest {
                     .build())
             .setStateApiServiceDescriptor(ApiServiceDescriptor.getDefaultInstance())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     CompletableFuture<StateResponse>[] successfulResponse = new CompletableFuture[1];
     CompletableFuture<StateResponse>[] unsuccessfulResponse = new CompletableFuture[1];
@@ -1460,7 +1470,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(
@@ -1509,7 +1520,8 @@ public class ProcessBundleHandlerTest {
                     .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
                     .build())
             .build();
-    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+    Map<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry =
+        ImmutableMap.of("1L", processBundleDescriptor);
 
     ProcessBundleHandler handler =
         new ProcessBundleHandler(