You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/07/13 04:40:07 UTC

[flink-statefun] branch master updated (8376afa -> 289c30e)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 8376afa  [FLINK-18016] [doc] Add documentation for UDS domain socket transport
     add 2693f95  [FLINK-18316] [core] Remove BoundState
     add 23c8df1  [FLINK-18316] [core] Refactor Flink StateBinder
     add 2eb4aab  [FLINK-18316] [sdk] Add SDK classes for PersistedStateRegistry
     add d54a66e  [FLINK-18316] [core] Let original Flink StateBinder implement SDK StateBinder
     add 54cc5e8  [FLINK-18316] [core] Wire in PersistedStateRegistry in PersistedStates.findAndBind
     add 414e446  [FLINK-17954] [core] Introduce PersistedRemoteFunctionValues
     add 48dc474  [FLINK-17954] [core] Use PersistedRemoteFunctionValues in RequestReplyFunction
     new 289c30e  [FLINK-17954] [core] Demux legacy remote function state on restore

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/StatefulFunctionsConfig.java        |  15 ++
 .../core/functions/FunctionGroupOperator.java      |  16 ++
 .../flink/core/functions/LiveFunction.java         |   4 -
 .../statefun/flink/core/functions/Reductions.java  |   4 +-
 .../functions/RemoteFunctionStateMigrator.java     | 148 +++++++++++++
 .../flink/core/functions/StatefulFunction.java     |  10 -
 .../core/functions/StatefulFunctionRepository.java |  12 +-
 .../flink/core/httpfn/HttpFunctionProvider.java    |   9 +-
 .../reqreply/PersistedRemoteFunctionValues.java    |  76 +++++++
 .../flink/core/reqreply/RequestReplyFunction.java  |  58 ++---
 .../statefun/flink/core/state/BoundState.java      |  87 --------
 .../statefun/flink/core/state/FlinkState.java      |   2 +-
 .../{StateBinder.java => FlinkStateBinder.java}    |  54 ++---
 .../statefun/flink/core/state/PersistedStates.java |  23 +-
 .../flink/statefun/sdk/state/ApiExtension.java     |   9 +
 .../functions/LocalStatefulFunctionGroupTest.java  |   7 -
 .../core/reqreply/RequestReplyFunctionTest.java    |   3 +-
 ...ateBinderTest.java => PersistedStatesTest.java} |  58 ++++-
 .../operator/StateBootstrapFunctionRegistry.java   |   9 +-
 .../statefun/sdk/state/PersistedStateRegistry.java | 246 +++++++++++++++++++++
 .../flink/statefun/sdk/state/StateBinder.java      |  42 ++++
 .../sdk/state/PersistedStateRegistryTest.java      |  28 ++-
 22 files changed, 700 insertions(+), 220 deletions(-)
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
 create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
 delete mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/BoundState.java
 rename statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/{StateBinder.java => FlinkStateBinder.java} (58%)
 rename statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/state/{StateBinderTest.java => PersistedStatesTest.java} (77%)
 create mode 100644 statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
 create mode 100644 statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/sdk/state/ApiExtension.java => statefun-sdk/src/test/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistryTest.java (56%)


[flink-statefun] 01/01: [FLINK-17954] [core] Demux legacy remote function state on restore

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 289c30e8cdb54d2504ee47a57858a1d179f9a540
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jun 5 01:15:13 2020 +0800

    [FLINK-17954] [core] Demux legacy remote function state on restore
    
    This commit adds backwards compatibility for versions 2.1.x, by
    performing state migration to demultiplex the PersistedTable used for
    remote function states into PersistedValues.
    
    We intend to only have this backwards compatibility for one version
    only. Therefore, the plan is to revert these changes for 2.3.x.
    This means if a user intends to upgrade from 2.1.x to 2.3.x, they need
    to first upgrade to 2.2.x.
    
    This closes #126.
    This closes #125.
---
 .../flink/core/StatefulFunctionsConfig.java        |  15 +++
 .../core/functions/FunctionGroupOperator.java      |  16 +++
 .../functions/RemoteFunctionStateMigrator.java     | 148 +++++++++++++++++++++
 .../flink/core/httpfn/HttpFunctionProvider.java    |   4 +
 .../statefun/flink/core/state/FlinkState.java      |   2 +-
 5 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 293f917..5425d72 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -96,6 +96,13 @@ public class StatefulFunctionsConfig implements Serializable {
           .withDescription(
               "The max number of async operations per task before backpressure is applied.");
 
+  public static final ConfigOption<Boolean> MIGRATE_LEGACY_REMOTE_FN_STATE =
+      ConfigOptions.key("statefun.remote.migrate-legacy-state")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "Indicates whether or not legacy remote function state should be migrated. This should be true only if you are restoring from a savepoint taken with version <= 2.1.x.");
+
   /**
    * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
    * current environment set via the {@code flink-conf.yaml}.
@@ -130,6 +137,8 @@ public class StatefulFunctionsConfig implements Serializable {
 
   private int maxAsyncOperationsPerTask;
 
+  private boolean migrateLegacyRemoteFunctionState;
+
   private Map<String, String> globalConfigurations = new HashMap<>();
 
   /**
@@ -144,6 +153,7 @@ public class StatefulFunctionsConfig implements Serializable {
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
     this.maxAsyncOperationsPerTask = configuration.get(ASYNC_MAX_OPERATIONS_PER_TASK);
+    this.migrateLegacyRemoteFunctionState = configuration.get(MIGRATE_LEGACY_REMOTE_FN_STATE);
 
     for (String key : configuration.keySet()) {
       if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -194,6 +204,11 @@ public class StatefulFunctionsConfig implements Serializable {
     this.maxAsyncOperationsPerTask = maxAsyncOperationsPerTask;
   }
 
+  /** Flag indicating whether or not legacy remote function state should be migrated. */
+  public boolean shouldMigrateLegacyRemoteFnState() {
+    return this.migrateLegacyRemoteFunctionState;
+  }
+
   /**
    * Retrieves the universe provider for loading modules.
    *
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index a5d7335..5a369d1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -36,6 +36,7 @@ import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureVa
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -122,6 +123,21 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
             new MailboxExecutorFacade(mailboxExecutor, "Stateful Functions Mailbox"),
             getRuntimeContext().getMetricGroup().addGroup("functions"),
             asyncOperationState);
+
+    //
+    // De-multiplex legacy remote function state in versions <= 2.1.x
+    // TODO backwards compatibility path for 2.1.x supported only in 2.2.x, remove for 2.3.x
+    //
+    if (configuration.shouldMigrateLegacyRemoteFnState()) {
+      final DynamicallyRegisteredTypes dynamicallyRegisteredTypes =
+          new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types());
+      RemoteFunctionStateMigrator.apply(
+          statefulFunctionsUniverse.functions(),
+          getKeyedStateBackend(),
+          dynamicallyRegisteredTypes.registerType(String.class),
+          dynamicallyRegisteredTypes.registerType(byte[].class));
+    }
+
     //
     // expire all the pending async operations.
     //
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
new file mode 100644
index 0000000..1b4b33c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.functions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.state.FlinkState;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+/**
+ * Performs state migration for legacy remote function state in StateFun versions <= 2.1.x.
+ *
+ * <p>TODO we plan to remove this backwards compatibility path in version 2.3.0, meaning that TODO
+ * users who want to upgrade from 2.1.x to 2.3.x need to first upgrade to 2.2.x.
+ */
+final class RemoteFunctionStateMigrator
+    implements KeyedStateFunction<String, MapState<String, byte[]>> {
+
+  private static final String LEGACY_MUX_STATE_NAME = "states";
+
+  static void apply(
+      Map<FunctionType, StatefulFunctionProvider> functionProviders,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo)
+      throws Exception {
+    functionProviders.entrySet().stream()
+        .filter(RemoteFunctionStateMigrator::isRemoteFunctionProvider)
+        .forEach(
+            remoteFunctionProvider ->
+                migrateRemoteFunctionState(
+                    remoteFunctionProvider, keyedStateBackend, keyTypeInfo, valueTypeInfo));
+  }
+
+  private static boolean isRemoteFunctionProvider(
+      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry) {
+    return functionProviderEntry.getValue() instanceof HttpFunctionProvider;
+  }
+
+  private static void migrateRemoteFunctionState(
+      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo) {
+    final FunctionType functionType = functionProviderEntry.getKey();
+    final HttpFunctionSpec functionSpec =
+        ((HttpFunctionProvider) functionProviderEntry.getValue()).getFunctionSpec(functionType);
+
+    try {
+      final RemoteFunctionStateMigrator stateMigrator =
+          new RemoteFunctionStateMigrator(
+              demuxValueStateHandles(
+                  functionSpec.states(), functionType, keyedStateBackend, valueTypeInfo));
+
+      keyedStateBackend.applyToAllKeys(
+          VoidNamespace.INSTANCE,
+          VoidNamespaceSerializer.INSTANCE,
+          multiplexedStateDescriptor(functionType, keyTypeInfo, valueTypeInfo),
+          stateMigrator);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Error migrating multiplexed state for remote function type " + functionType);
+    }
+  }
+
+  /** The value states to de-mux the multiplexed state into. */
+  private final Map<String, ValueState<byte[]>> demuxValueStates;
+
+  private RemoteFunctionStateMigrator(Map<String, ValueState<byte[]>> demuxValueStates) {
+    this.demuxValueStates = Objects.requireNonNull(demuxValueStates);
+  }
+
+  @Override
+  public void process(String key, MapState<String, byte[]> multiplexedState) throws Exception {
+    for (Map.Entry<String, byte[]> entry : multiplexedState.entries()) {
+      final String stateName = entry.getKey();
+      final byte[] value = entry.getValue();
+
+      final ValueState<byte[]> demuxState = demuxValueStates.get(stateName);
+      // drop state if it is no longer registered, otherwise migrate to value state
+      if (demuxState != null) {
+        demuxState.update(value);
+      }
+    }
+    multiplexedState.clear();
+  }
+
+  private static Map<String, ValueState<byte[]>> demuxValueStateHandles(
+      List<StateSpec> stateSpecs,
+      FunctionType functionType,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<byte[]> valueTypeInfo)
+      throws Exception {
+    final Map<String, ValueState<byte[]>> valueStates = new HashMap<>(stateSpecs.size());
+    for (StateSpec stateSpec : stateSpecs) {
+      valueStates.put(
+          stateSpec.name(),
+          keyedStateBackend.getOrCreateKeyedState(
+              VoidNamespaceSerializer.INSTANCE,
+              demuxValueStateDescriptor(functionType, stateSpec, valueTypeInfo)));
+    }
+    return valueStates;
+  }
+
+  private static ValueStateDescriptor<byte[]> demuxValueStateDescriptor(
+      FunctionType functionType, StateSpec stateSpec, TypeInformation<byte[]> valueTypeInfo) {
+    return new ValueStateDescriptor<>(
+        FlinkState.flinkStateName(functionType, stateSpec.name()), valueTypeInfo);
+  }
+
+  private static MapStateDescriptor<String, byte[]> multiplexedStateDescriptor(
+      FunctionType functionType,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo) {
+    return new MapStateDescriptor<>(
+        FlinkState.flinkStateName(functionType, LEGACY_MUX_STATE_NAME), keyTypeInfo, valueTypeInfo);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 0bc6de1..e8fc0a4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -50,6 +50,10 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
         buildHttpClient(spec));
   }
 
+  public HttpFunctionSpec getFunctionSpec(FunctionType type) {
+    return supportedTypes.get(type);
+  }
+
   private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
     OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
     clientBuilder.callTimeout(spec.maxRequestDuration());
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
index b46a3be..10efa59 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
@@ -102,7 +102,7 @@ public final class FlinkState implements State {
     keyedStateBackend.setCurrentKey(KeyBy.apply(address));
   }
 
-  private static String flinkStateName(FunctionType functionType, String name) {
+  public static String flinkStateName(FunctionType functionType, String name) {
     return String.format("%s.%s.%s", functionType.namespace(), functionType.name(), name);
   }
 }