You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/03 19:18:35 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #15238: [WIP] Add MapState and SetState support

lukecwik commented on a change in pull request #15238:
URL: https://github.com/apache/beam/pull/15238#discussion_r681973796



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -2172,22 +2170,6 @@ static void verifyDoFnSupported(DoFn<?, ?> fn, boolean streaming, boolean stream
               "%s does not currently support @RequiresTimeSortedInput in streaming mode.",
               DataflowRunner.class.getSimpleName()));
     }
-    if (DoFnSignatures.usesSetState(fn)) {

Review comment:
       This would only be true on runner v2 so we should make this conditional only if `useUnifiedWorker` is also `true`.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {

Review comment:
       If `read` is never called on the returned `ReadableState`, will the value be added?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -334,7 +425,142 @@ public void clear() {
       StateSpec<MapState<KeyT, ValueT>> spec,
       Coder<KeyT> mapKeyCoder,
       Coder<ValueT> mapValueCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
+    return (MapState<KeyT, ValueT>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new MapState<KeyT, ValueT>() {
+                  private final MultimapUserState<KeyT, ValueT> impl =
+                      createMultimapUserState(id, mapKeyCoder, mapValueCoder);
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public void put(KeyT key, ValueT value) {
+                    impl.remove(key);
+                    impl.put(key, value);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> computeIfAbsent(
+                      KeyT key, Function<? super KeyT, ? extends ValueT> mappingFunction) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        if (Iterables.isEmpty(values)) {
+                          impl.put(key, mappingFunction.apply(key));
+                          return null;
+                        }
+                        return Iterables.getOnlyElement(values, null);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(KeyT key) {
+                    impl.remove(key);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> get(KeyT key) {
+                    return getOrDefault(key, null);
+                  }
+
+                  @Override
+                  public ReadableState<ValueT> getOrDefault(
+                      KeyT key, @Nullable ValueT defaultValue) {
+                    return new ReadableState<ValueT>() {
+                      @Override
+                      public @Nullable ValueT read() {
+                        Iterable<ValueT> values = impl.get(key);
+                        return Iterables.getOnlyElement(values, defaultValue);
+                      }
+
+                      @Override
+                      public ReadableState<ValueT> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<KeyT>> keys() {
+                    return new ReadableState<Iterable<KeyT>>() {
+                      @Override
+                      public @Nullable Iterable<KeyT> read() {
+                        return impl.keys();
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<KeyT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<ValueT>> values() {
+                    return new ReadableState<Iterable<ValueT>>() {
+                      @Override
+                      public @Nullable Iterable<ValueT> read() {
+                        return Iterables.transform(entries().read(), e -> e.getValue());
+                      }
+
+                      @Override
+                      public ReadableState<Iterable<ValueT>> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Iterable<Map.Entry<KeyT, ValueT>>> entries() {
+                    return new ReadableState<Iterable<Map.Entry<KeyT, ValueT>>>() {
+                      @Override
+                      public @Nullable Iterable<Map.Entry<KeyT, ValueT>> read() {
+                        Iterable<KeyT> keys = keys().read();

Review comment:
       Consider using Iterables.transform() here as well.
   
   There should be an ImmutableMapEntry class somewhere in Guava.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Update keys
+    if (isCleared || !pendingRemoves.isEmpty()) {

Review comment:
       Not sure why we are clearing after `pendingAdds` has been processed.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Update keys
+    if (isCleared || !pendingRemoves.isEmpty()) {
+      beamFnStateClient.handle(
+          keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(keys())).build()),
+          new CompletableFuture<>());
+    } else {
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(pendingKeys)).build()),
+          new CompletableFuture<>());
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeKeys(Iterable<K> keys) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (K key : keys) {
+        keyCoder.encode(key, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode keys for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    ByteString.Output keyStream = ByteString.newOutput();
+    try {
+      encodedKey.writeTo(keyStream);
+      keyCoder.encode(key, keyStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode key %s for multimap user state id %s.", key, stateId), e);
+    }
+
+    StateRequest.Builder request = StateRequest.newBuilder();
+    request
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow)
+        .setKey(keyStream.toByteString());
+    return request.build();
+  }
+
+  private Iterable<V> getPersistedValues(K key) {
+    if (persistedValues.get(key).isEmpty()) {

Review comment:
       If the iterable is empty, we will keep issuing a new request to the runner even if we know it is empty. We should differentiate persistedValues unknown and empty cases differently.

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }
 
+  message MultimapKeysUserState {
+    // (Required) The id of the PTransform containing user state.
+    string transform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context.
+    bytes window = 3;

Review comment:
       Need a key representing the processing key here.

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }
 
+  message MultimapKeysUserState {
+    // (Required) The id of the PTransform containing user state.
+    string transform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context.
+    bytes window = 3;
+  }
+
+  message MultimapUserState {
+    // (Required) The id of the PTransform containing user state.
+    string transform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context.
+    bytes window = 3;
+    // (Required) The key of the currently executing element encoded in a
+    // nested context.
+    bytes key = 4;

Review comment:
       Need a lookup key here.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,

Review comment:
       consider naming this mapKeyCoder or lookupKeyCoder.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        if (!contains(t).read()) {
+                          add(t);
+                          return true;
+                        }
+                        return false;
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public void remove(T t) {
+                    impl.remove(t);
+                  }
+
+                  @Override
+                  public void add(T value) {
+                    impl.remove(value);

Review comment:
       addIfAbsent().read()?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.

Review comment:
       I believe we should guarantee that there are no duplicates so we better replicate the Java map interface which most people are familiar with.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)

Review comment:
       Can we remove this suppression since this is new code?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, BooleanCoder.of());
+
+                  @Override
+                  public void clear() {
+                    impl.clear();
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> contains(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        return !Iterables.isEmpty(impl.get(t));
+                      }
+
+                      @Override
+                      public ReadableState<Boolean> readLater() {
+                        return this;
+                      }
+                    };
+                  }
+
+                  @Override
+                  public ReadableState<Boolean> addIfAbsent(T t) {
+                    return new ReadableState<Boolean>() {
+                      @Override
+                      public @Nullable Boolean read() {
+                        if (!contains(t).read()) {
+                          add(t);

Review comment:
       ```suggestion
                             impl.put(t, true);
   ```
   
   Saves on the remove call.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =

Review comment:
       Consider using LinkedListMultimap for pendingAdds since it maintains insertion ordering for calls to keys()/keySet() instead of storing pendingKeys separately.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
##########
@@ -325,7 +329,94 @@ public void clear() {
 
   @Override
   public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-    throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
+    return (SetState<T>)
+        stateKeyObjectCache.computeIfAbsent(
+            createMultimapUserStateKey(id),
+            new Function<StateKey, Object>() {
+              @Override
+              public Object apply(StateKey key) {
+                return new SetState<T>() {
+                  private final MultimapUserState<T, Boolean> impl =
+                      createMultimapUserState(id, elemCoder, BooleanCoder.of());

Review comment:
       Consider using `VoidCoder.of()` and storing `null` instead for the value.

##########
File path: model/fn-execution/src/main/proto/beam_fn_api.proto
##########
@@ -721,14 +721,36 @@ message StateKey {
     bytes key = 4;
   }
 
+  message MultimapKeysUserState {

Review comment:
       We should document that Get returns keys, Clear removes all keys, Append is an error.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);

Review comment:
       ```suggestion
       if (!isCleared) {
         pendingRemoves.add(key);
       }
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)

Review comment:
       Can't this be one clear request containing MultimapKeysUserState instead of enumerating all the keys?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Update keys
+    if (isCleared || !pendingRemoves.isEmpty()) {
+      beamFnStateClient.handle(
+          keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(keys())).build()),
+          new CompletableFuture<>());
+    } else {
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(pendingKeys)).build()),
+          new CompletableFuture<>());
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeKeys(Iterable<K> keys) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (K key : keys) {
+        keyCoder.encode(key, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode keys for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    ByteString.Output keyStream = ByteString.newOutput();
+    try {
+      encodedKey.writeTo(keyStream);

Review comment:
       You want to differentiate the processing key from the lookup key in the proto.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();

Review comment:
       Unless we simplify the number of data structures and their relationships we should document the relationships that the various data structures have.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));

Review comment:
       Did you mean?
   ```suggestion
       pendingKeys.removeAll(key);
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));
+    return Iterables.concat(
+        persistedKeys,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size()));
+  }
+
+  /*
+   * Store a key-value pair in the multimap.
+   * Allows duplicate key-value pairs.
+   */
+  public void put(K key, V value) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.put(key, value);
+    pendingKeys.add(key);
+  }
+
+  /*
+   * Removes all values for this key in the multimap.
+   */
+  public void remove(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+    pendingAdds.removeAll(key);
+    pendingKeys.removeAll(Collections.singletonList(key));
+    pendingRemoves.add(key);
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  // Update data in persistent store
+  public void asyncClose() throws Exception {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    // Nothing to persist
+    if (!isCleared && pendingRemoves.isEmpty() && pendingAdds.isEmpty()) {
+      isClosed = true;
+      return;
+    }
+
+    // Clear currently persisted key-values
+    if (isCleared) {
+      for (K key : getPersistedKeys()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    } else if (!pendingRemoves.isEmpty()) {
+      Iterable<K> removeKeys = Iterables.filter(getPersistedKeys(), pendingRemoves::contains);
+      for (K key : removeKeys) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setClear(StateClearRequest.getDefaultInstance()),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Persist pending key-values
+    if (!pendingAdds.isEmpty()) {
+      for (K key : pendingAdds.keySet()) {
+        beamFnStateClient.handle(
+            createUserStateRequest(key)
+                .toBuilder()
+                .setAppend(
+                    StateAppendRequest.newBuilder().setData(encodeValues(pendingAdds.get(key)))),
+            new CompletableFuture<>());
+      }
+    }
+
+    // Update keys
+    if (isCleared || !pendingRemoves.isEmpty()) {
+      beamFnStateClient.handle(
+          keysStateRequest.toBuilder().setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(keys())).build()),
+          new CompletableFuture<>());
+    } else {
+      beamFnStateClient.handle(
+          keysStateRequest
+              .toBuilder()
+              .setAppend(StateAppendRequest.newBuilder().setData(encodeKeys(pendingKeys)).build()),
+          new CompletableFuture<>());
+    }
+    isClosed = true;
+  }
+
+  private ByteString encodeKeys(Iterable<K> keys) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (K key : keys) {
+        keyCoder.encode(key, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode keys for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private ByteString encodeValues(Iterable<V> values) {
+    try {
+      ByteString.Output output = ByteString.newOutput();
+      for (V value : values) {
+        valueCoder.encode(value, output);
+      }
+      return output.toByteString();
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode values for multimap user state id %s.", stateId), e);
+    }
+  }
+
+  private StateRequest createUserStateRequest(K key) {
+    ByteString.Output keyStream = ByteString.newOutput();
+    try {
+      encodedKey.writeTo(keyStream);
+      keyCoder.encode(key, keyStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Failed to encode key %s for multimap user state id %s.", key, stateId), e);
+    }
+
+    StateRequest.Builder request = StateRequest.newBuilder();
+    request

Review comment:
       Re-use `keysStateRequest` instead of setting this all again via `keysStateRequest.toBuilder()`

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of a multimap user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache memory
+ * pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class MultimapUserState<K, V> {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final Coder<K> keyCoder;
+  private final Coder<V> valueCoder;
+  private final String instructionId;
+  private final String pTransformId;
+  private final String stateId;
+  private final ByteString encodedWindow;
+  private final ByteString encodedKey;
+  private final StateRequest keysStateRequest;
+
+  private boolean isClosed;
+  private boolean isCleared;
+  private Set<K> pendingRemoves = Sets.newHashSet();
+  private Set<K> negativeCache = Sets.newHashSet();
+  private Multimap<K, V> pendingAdds = ArrayListMultimap.create();
+  private Multimap<K, V> persistedValues = ArrayListMultimap.create();
+  private @Nullable Iterable<K> persistedKeys = null;
+  private List<K> pendingKeys =
+      Lists.newArrayList(); // separate from pendingAdd since keys aren't ordered.
+
+  public MultimapUserState(
+      BeamFnStateClient beamFnStateClient,
+      String instructionId,
+      String pTransformId,
+      String stateId,
+      ByteString encodedWindow,
+      ByteString encodedKey,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.keyCoder = keyCoder;
+    this.valueCoder = valueCoder;
+    this.instructionId = instructionId;
+    this.pTransformId = pTransformId;
+    this.stateId = stateId;
+    this.encodedWindow = encodedWindow;
+    this.encodedKey = encodedKey;
+
+    StateRequest.Builder keysStateRequestBuilder = StateRequest.newBuilder();
+    keysStateRequestBuilder
+        .setInstructionId(instructionId)
+        .getStateKeyBuilder()
+        .getMultimapKeysUserStateBuilder()
+        .setTransformId(pTransformId)
+        .setUserStateId(stateId)
+        .setWindow(encodedWindow);
+    keysStateRequest = keysStateRequestBuilder.build();
+  }
+
+  public void clear() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    isCleared = true;
+    persistedValues = ArrayListMultimap.create();
+    persistedKeys = null;
+    pendingRemoves = Sets.newHashSet();
+    pendingAdds = ArrayListMultimap.create();
+    pendingKeys = Lists.newArrayList();
+  }
+
+  /*
+   * Returns an iterable of the values associated with key in this multimap, if any.
+   * If there are no values, this returns an empty collection, not null.
+   */
+  public Iterable<V> get(@NonNull K key) {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    Collection<V> pendingValues = pendingAdds.get(key);
+    if (isCleared || negativeCache.contains(key) || pendingRemoves.contains(key)) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size());
+    }
+
+    Iterable<V> persistedValues = getPersistedValues(key);
+    if (Iterables.isEmpty(persistedValues)) {
+      negativeCache.add(key);
+    }
+    return Iterables.concat(
+        persistedValues,
+        Iterables.limit(Iterables.unmodifiableIterable(pendingValues), pendingValues.size()));
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-12687)
+  })
+  /*
+   * Returns an iterables containing the key from each key-value pair in this multimap, without collapsing duplicates.
+   */
+  public Iterable<K> keys() {
+    checkState(
+        !isClosed,
+        "Multimap user state is no longer usable because it is closed for %s",
+        keysStateRequest.getStateKey());
+
+    if (isCleared) {
+      return Iterables.limit(Iterables.unmodifiableIterable(pendingKeys), pendingKeys.size());
+    }
+
+    Iterable<K> persistedKeys = getPersistedKeys();
+    persistedKeys = Iterables.filter(persistedKeys, key -> !pendingRemoves.contains(key));

Review comment:
       Iterables.filter() is a view and not something which is immediately resolved. You'll need to create a copy of pendingRemoves to ensure that future clear/write calls aren't reflected in the previous read as per the expectations of [MapState#remove](https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/state/MapState.html#remove-K-)
   
   You'll have similar issues with pendingAdds as well and its usage to ensure that we don't modify the state of an existing read() call with a future clear/remove call.
   
   Best to unit test this exhaustively like how we check in [BagUserStateTest.java](https://github.com/apache/beam/blob/master/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org