You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:45 UTC
[flink-statefun] 01/06: [FLINK-16244] Add PendingAsyncOperations
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 165e5bf2862e512ae0820c49fae84af700b8904e
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Feb 24 21:56:21 2020 +0100
[FLINK-16244] Add PendingAsyncOperations
---
.../core/functions/PendingAsyncOperations.java | 131 ++++++++++++++
.../core/functions/PendingAsyncOperationsTest.java | 189 +++++++++++++++++++++
2 files changed, 320 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java
new file mode 100644
index 0000000..29f6c85
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.functions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.di.Inject;
+import org.apache.flink.statefun.flink.core.di.Label;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.state.State;
+import org.apache.flink.statefun.sdk.Address;
+
+final class PendingAsyncOperations {
+
+ /**
+ * holds the currently in flight and not yet checkpointed async operations.
+ *
+ * <p>A key can be removed from this map in two ways:
+ *
+ * <ul>
+ * <li>If a remove method was called explicitly (as a result of an async operation completion)
+ * <li>A checkpoint happens while the async operation is in flight (flush is called). In that
+ * case the key would be removed from the memoryStore, and written to the backingStore.
+ * </ul>
+ */
+ private final Map<Address, Map<Long, Message>> memoryStore = new HashMap<>();
+
+ /** the underlying backing state handle */
+ private final MapState<Long, Message> backingStore;
+
+ private final Consumer<Address> keySetter;
+
+ @Inject
+ PendingAsyncOperations(
+ @Label("state") State state,
+ @Label("async-operations") MapState<Long, Message> backingStore) {
+ this(state::setCurrentKey, backingStore);
+ }
+
+ @VisibleForTesting
+ PendingAsyncOperations(Consumer<Address> keySetter, MapState<Long, Message> backingStore) {
+ this.backingStore = Objects.requireNonNull(backingStore);
+ this.keySetter = Objects.requireNonNull(keySetter);
+ }
+
+ /**
+ * Adds an uncompleted async operation.
+ *
+ * @param owningAddress the address that had registered the async operation
+ * @param futureId the futureId that is associated with that operation
+ * @param message the message that was registered with that operation
+ */
+ void add(Address owningAddress, long futureId, Message message) {
+ Map<Long, Message> asyncOps =
+ memoryStore.computeIfAbsent(owningAddress, unused -> new HashMap<>());
+ asyncOps.put(futureId, message);
+ }
+
+ /**
+ * Removes the completed async operation.
+ *
+ * <p>NOTE: this method should be called with {@link
+ * org.apache.flink.statefun.flink.core.state.State#setCurrentKey(Address)} set on the
+ * owningAddress. This should be the case as it is called by {@link
+ * AsyncMessageDecorator#postApply()}.
+ */
+ void remove(Address owningAddress, long futureId) {
+ Map<Long, Message> asyncOps = memoryStore.get(owningAddress);
+ if (asyncOps == null) {
+ // there are no async operations in the memory store,
+ // therefore it must have been previously flushed to the backing store.
+ removeFromTheBackingStore(owningAddress, futureId);
+ return;
+ }
+ Message message = asyncOps.remove(futureId);
+ if (message == null) {
+ // async operation was not found, it was flushed to the backing store.
+ removeFromTheBackingStore(owningAddress, futureId);
+ }
+ if (asyncOps.isEmpty()) {
+ // asyncOps has become empty after removing futureId,
+ // we need to remove it from memoryStore.
+ memoryStore.remove(owningAddress);
+ }
+ }
+
+ /** Moves the contents of the memoryStore into the backingStore. */
+ void flush() {
+ memoryStore.forEach(this::flushState);
+ memoryStore.clear();
+ }
+
+ private void flushState(Address address, Map<Long, Message> perAddressState) {
+ keySetter.accept(address);
+ try {
+ backingStore.putAll(perAddressState);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Unable to persisted a previously registered asynchronous operation for " + address, e);
+ }
+ }
+
+ private void removeFromTheBackingStore(Address address, long futureId) {
+ try {
+ this.backingStore.remove(futureId);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Unable to remove a registered asynchronous operation for " + address, e);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java
new file mode 100644
index 0000000..62a75db
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.functions;
+
+import static org.apache.flink.statefun.flink.core.TestUtils.FUNCTION_1_ADDR;
+import static org.apache.flink.statefun.flink.core.TestUtils.FUNCTION_2_ADDR;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Consumer;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.statefun.flink.core.TestUtils;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+public class PendingAsyncOperationsTest {
+ private final MemoryMapState<Long, Message> miniStateBackend = new MemoryMapState<>();
+ private final Message dummyMessage = TestUtils.ENVELOPE_FACTORY.from(1);
+
+ @Test
+ public void exampleUsage() {
+ PendingAsyncOperations pendingOps =
+ new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+ miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+ pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+ pendingOps.flush();
+
+ assertThat(miniStateBackend, matchesAddressState(FUNCTION_1_ADDR, hasKey(1L)));
+ }
+
+ @Test
+ public void itemsAreExplicitlyFlushed() {
+ PendingAsyncOperations pendingOps =
+ new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+ miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+ pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+
+ assertThat(miniStateBackend, not(matchesAddressState(FUNCTION_1_ADDR, hasKey(1L))));
+ }
+
+ @Test
+ public void inFlightItemsDoNotFlush() {
+ PendingAsyncOperations pendingOps =
+ new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+ miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+ pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+ pendingOps.remove(FUNCTION_1_ADDR, 1);
+ pendingOps.flush();
+
+ assertThat(miniStateBackend, not(matchesAddressState(FUNCTION_1_ADDR, hasKey(1L))));
+ }
+
+ @Test
+ public void differentAddressesShouldBeFlushedToTheirStates() {
+ PendingAsyncOperations pendingOps =
+ new PendingAsyncOperations(miniStateBackend, miniStateBackend);
+
+ miniStateBackend.setCurrentAddress(FUNCTION_1_ADDR);
+ pendingOps.add(FUNCTION_1_ADDR, 1, dummyMessage);
+
+ miniStateBackend.setCurrentAddress(FUNCTION_2_ADDR);
+ pendingOps.add(FUNCTION_2_ADDR, 1, dummyMessage);
+
+ pendingOps.flush();
+
+ assertThat(
+ miniStateBackend,
+ allOf(
+ matchesAddressState(FUNCTION_1_ADDR, hasKey(1L)),
+ matchesAddressState(FUNCTION_2_ADDR, hasKey(1L))));
+ }
+
+ private static <K, V, M> Matcher<MemoryMapState<K, V>> matchesAddressState(
+ Address address, Matcher<M> matcher) {
+ return new TypeSafeMatcher<MemoryMapState<K, V>>() {
+ @Override
+ protected boolean matchesSafely(MemoryMapState<K, V> memoryMapState) {
+ return matcher.matches(memoryMapState.states.get(address));
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ matcher.describeTo(description);
+ }
+ };
+ }
+
+ private static final class MemoryMapState<K, V> implements MapState<K, V>, Consumer<Address> {
+ Map<Address, Map<K, V>> states = new HashMap<>();
+ Address address;
+
+ @Override
+ public void accept(Address address) {
+ this.address = address;
+ }
+
+ public void setCurrentAddress(Address address) {
+ this.address = address;
+ }
+
+ public Map<K, V> perCurrentAddressState() {
+ assert address != null;
+ return states.computeIfAbsent(address, unused -> new HashMap<>());
+ }
+
+ @Override
+ public V get(K key) {
+ return perCurrentAddressState().get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ perCurrentAddressState().put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<K, V> map) {
+ perCurrentAddressState().putAll(map);
+ }
+
+ @Override
+ public void remove(K key) {
+ perCurrentAddressState().remove(key);
+ }
+
+ @Override
+ public boolean contains(K key) {
+ return perCurrentAddressState().containsKey(key);
+ }
+
+ @Override
+ public Iterable<Entry<K, V>> entries() {
+ return perCurrentAddressState().entrySet();
+ }
+
+ @Override
+ public Iterable<K> keys() {
+ return perCurrentAddressState().keySet();
+ }
+
+ @Override
+ public Iterable<V> values() {
+ return perCurrentAddressState().values();
+ }
+
+ @Override
+ public Iterator<Entry<K, V>> iterator() {
+ return perCurrentAddressState().entrySet().iterator();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return perCurrentAddressState().isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ perCurrentAddressState().clear();
+ }
+ }
+}