You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:45 UTC

[flink-statefun] 01/06: [FLINK-16244] Add PendingAsyncOperations

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 165e5bf2862e512ae0820c49fae84af700b8904e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Feb 24 21:56:21 2020 +0100

    [FLINK-16244] Add PendingAsyncOperations
---
 .../core/functions/PendingAsyncOperations.java     | 131 ++++++++++++++
 .../core/functions/PendingAsyncOperationsTest.java | 189 +++++++++++++++++++++
 2 files changed, 320 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java
new file mode 100644
index 0000000..29f6c85
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.functions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.di.Inject;
+import org.apache.flink.statefun.flink.core.di.Label;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.state.State;
+import org.apache.flink.statefun.sdk.Address;
+
+final class PendingAsyncOperations {
+
+  /**
+   * holds the currently in flight and not yet checkpointed async operations.
+   *
+   * <p>A key can be removed from this map in two ways:
+   *
+   * <ul>
+   *   <li>If a remove method was called explicitly (as a result of an async operation completion)
+   *   <li>A checkpoint happens while the async operation is in flight (flush is called). In that
+   *       case the key would be removed from the memoryStore, and written to the backingStore.
+   * </ul>
+   */
+  private final Map<Address, Map<Long, Message>> memoryStore = new HashMap<>();
+
+  /** the underlying backing state handle */
+  private final MapState<Long, Message> backingStore;
+
+  private final Consumer<Address> keySetter;
+
+  @Inject
+  PendingAsyncOperations(
+      @Label("state") State state,
+      @Label("async-operations") MapState<Long, Message> backingStore) {
+    this(state::setCurrentKey, backingStore);
+  }
+
+  @VisibleForTesting
+  PendingAsyncOperations(Consumer<Address> keySetter, MapState<Long, Message> backingStore) {
+    this.backingStore = Objects.requireNonNull(backingStore);
+    this.keySetter = Objects.requireNonNull(keySetter);
+  }
+
+  /**
+   * Adds an uncompleted async operation.
+   *
+   * @param owningAddress the address that had registered the async operation
+   * @param futureId the futureId that is associated with that operation
+   * @param message the message that was registered with that operation
+   */
+  void add(Address owningAddress, long futureId, Message message) {
+    Map<Long, Message> asyncOps =
+        memoryStore.computeIfAbsent(owningAddress, unused -> new HashMap<>());
+    asyncOps.put(futureId, message);
+  }
+
+  /**
+   * Removes the completed async operation.
+   *
+   * <p>NOTE: this method should be called with {@link
+   * org.apache.flink.statefun.flink.core.state.State#setCurrentKey(Address)} set on the
+   * owningAddress. This should be the case as it is called by {@link
+   * AsyncMessageDecorator#postApply()}.
+   */
+  void remove(Address owningAddress, long futureId) {
+    Map<Long, Message> asyncOps = memoryStore.get(owningAddress);
+    if (asyncOps == null) {
+      // there are no async operations in the memory store,
+      // therefore it must have been previously flushed to the backing store.
+      removeFromTheBackingStore(owningAddress, futureId);
+      return;
+    }
+    Message message = asyncOps.remove(futureId);
+    if (message == null) {
+      // async operation was not found, it was flushed to the backing store.
+      removeFromTheBackingStore(owningAddress, futureId);
+    }
+    if (asyncOps.isEmpty()) {
+      // asyncOps has become empty after removing futureId,
+      // we need to remove it from memoryStore.
+      memoryStore.remove(owningAddress);
+    }
+  }
+
+  /** Moves the contents of the memoryStore into the backingStore. */
+  void flush() {
+    memoryStore.forEach(this::flushState);
+    memoryStore.clear();
+  }
+
+  private void flushState(Address address, Map<Long, Message> perAddressState) {
+    keySetter.accept(address);
+    try {
+      backingStore.putAll(perAddressState);
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Unable to persisted a previously registered asynchronous operation for " + address, e);
+    }
+  }
+
+  private void removeFromTheBackingStore(Address address, long futureId) {
+    try {
+      this.backingStore.remove(futureId);
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Unable to remove a registered asynchronous operation for " + address, e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java
new file mode 100644
index 0000000..62a75db
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.functions;
+
+import static org.apache.flink.statefun.flink.core.TestUtils.FUNCTION_1_ADDR;
+import static org.apache.flink.statefun.flink.core.TestUtils.FUNCTION_2_ADDR;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.TestUtils;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+public class PendingAsyncOperationsTest {
+  private final MemoryMapState<Long, Message> miniStateBackend = new MemoryMapState<>();
+  private final Message dummyMessage = TestUtils.ENVELOPE_FACTORY.from(1);
+
+  @Test
+  public void exampleUsage() {
+    PendingAsyncOperations pendingOps =
+        new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+    miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+    pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+    pendingOps.flush();
+
+    assertThat(miniStateBackend, matchesAddressState(FUNCTION_1_ADDR, hasKey(1L)));
+  }
+
+  @Test
+  public void itemsAreExplicitlyFlushed() {
+    PendingAsyncOperations pendingOps =
+        new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+    miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+    pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+
+    assertThat(miniStateBackend, not(matchesAddressState(FUNCTION_1_ADDR, hasKey(1L))));
+  }
+
+  @Test
+  public void inFlightItemsDoNotFlush() {
+    PendingAsyncOperations pendingOps =
+        new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+    miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+    pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+    pendingOps.remove(FUNCTION_1_ADDR, 1);
+    pendingOps.flush();
+
+    assertThat(miniStateBackend, not(matchesAddressState(FUNCTION_1_ADDR, hasKey(1L))));
+  }
+
+  @Test
+  public void differentAddressesShouldBeFlushedToTheirStates() {
+    PendingAsyncOperations pendingOps =
+        new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+    miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+    pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+
+    miniStateBackend.setCurrentAddress(FUNCTION_2_ADDR);
+    pendingOps.add(FUNCTION_2_ADDR, 1, dummyMessage);
+
+    pendingOps.flush();
+
+    assertThat(
+        miniStateBackend,
+        allOf(
+            matchesAddressState(FUNCTION_1_ADDR, hasKey(1L)),
+            matchesAddressState(FUNCTION_2_ADDR, hasKey(1L))));
+  }
+
+  private static <K, V, M> Matcher<MemoryMapState<K, V>> matchesAddressState(
+      Address address, Matcher<M> matcher) {
+    return new TypeSafeMatcher<MemoryMapState<K, V>>() {
+      @Override
+      protected boolean matchesSafely(MemoryMapState<K, V> memoryMapState) {
+        return matcher.matches(memoryMapState.states.get(address));
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        matcher.describeTo(description);
+      }
+    };
+  }
+
+  private static final class MemoryMapState<K, V> implements MapState<K, V>, Consumer<Address> {
+    Map<Address, Map<K, V>> states = new HashMap<>();
+    Address address;
+
+    @Override
+    public void accept(Address address) {
+      this.address = address;
+    }
+
+    public void setCurrentAddress(Address address) {
+      this.address = address;
+    }
+
+    public Map<K, V> perCurrentAddressState() {
+      assert address != null;
+      return states.computeIfAbsent(address, unused -> new HashMap<>());
+    }
+
+    @Override
+    public V get(K key) {
+      return perCurrentAddressState().get(key);
+    }
+
+    @Override
+    public void put(K key, V value) {
+      perCurrentAddressState().put(key, value);
+    }
+
+    @Override
+    public void putAll(Map<K, V> map) {
+      perCurrentAddressState().putAll(map);
+    }
+
+    @Override
+    public void remove(K key) {
+      perCurrentAddressState().remove(key);
+    }
+
+    @Override
+    public boolean contains(K key) {
+      return perCurrentAddressState().containsKey(key);
+    }
+
+    @Override
+    public Iterable<Entry<K, V>> entries() {
+      return perCurrentAddressState().entrySet();
+    }
+
+    @Override
+    public Iterable<K> keys() {
+      return perCurrentAddressState().keySet();
+    }
+
+    @Override
+    public Iterable<V> values() {
+      return perCurrentAddressState().values();
+    }
+
+    @Override
+    public Iterator<Entry<K, V>> iterator() {
+      return perCurrentAddressState().entrySet().iterator();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return perCurrentAddressState().isEmpty();
+    }
+
+    @Override
+    public void clear() {
+      perCurrentAddressState().clear();
+    }
+  }
+}