You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/07/13 04:40:08 UTC

[flink-statefun] 01/01: [FLINK-17954] [core] Demux legacy remote function state on restore

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 289c30e8cdb54d2504ee47a57858a1d179f9a540
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jun 5 01:15:13 2020 +0800

    [FLINK-17954] [core] Demux legacy remote function state on restore
    
    This commit adds backwards compatibility for versions 2.1.x, by
    performing state migration to demultiplex the PersistedTable used for
    remote function states into PersistedValues.
    
    We intend to only have this backwards compatibility for one version
    only. Therefore, the plan is to revert these changes for 2.3.x.
    This means if a user intends to upgrade from 2.1.x to 2.3.x, they need
    to first upgrade to 2.2.x.
    
    This closes #126.
    This closes #125.
---
 .../flink/core/StatefulFunctionsConfig.java        |  15 +++
 .../core/functions/FunctionGroupOperator.java      |  16 +++
 .../functions/RemoteFunctionStateMigrator.java     | 148 +++++++++++++++++++++
 .../flink/core/httpfn/HttpFunctionProvider.java    |   4 +
 .../statefun/flink/core/state/FlinkState.java      |   2 +-
 5 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 293f917..5425d72 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -96,6 +96,13 @@ public class StatefulFunctionsConfig implements Serializable {
           .withDescription(
               "The max number of async operations per task before backpressure is applied.");
 
+  public static final ConfigOption<Boolean> MIGRATE_LEGACY_REMOTE_FN_STATE =
+      ConfigOptions.key("statefun.remote.migrate-legacy-state")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription(
+              "Indicates whether or not legacy remote function state should be migrated. This should be true only if you are restoring from a savepoint taken with version <= 2.1.x.");
+
   /**
    * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
    * current environment set via the {@code flink-conf.yaml}.
@@ -130,6 +137,8 @@ public class StatefulFunctionsConfig implements Serializable {
 
   private int maxAsyncOperationsPerTask;
 
+  private boolean migrateLegacyRemoteFunctionState;
+
   private Map<String, String> globalConfigurations = new HashMap<>();
 
   /**
@@ -144,6 +153,7 @@ public class StatefulFunctionsConfig implements Serializable {
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
     this.maxAsyncOperationsPerTask = configuration.get(ASYNC_MAX_OPERATIONS_PER_TASK);
+    this.migrateLegacyRemoteFunctionState = configuration.get(MIGRATE_LEGACY_REMOTE_FN_STATE);
 
     for (String key : configuration.keySet()) {
       if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -194,6 +204,11 @@ public class StatefulFunctionsConfig implements Serializable {
     this.maxAsyncOperationsPerTask = maxAsyncOperationsPerTask;
   }
 
+  /** Flag indicating whether or not legacy remote function state should be migrated. */
+  public boolean shouldMigrateLegacyRemoteFnState() {
+    return this.migrateLegacyRemoteFunctionState;
+  }
+
   /**
    * Retrieves the universe provider for loading modules.
    *
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index a5d7335..5a369d1 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -36,6 +36,7 @@ import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureVa
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -122,6 +123,21 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
             new MailboxExecutorFacade(mailboxExecutor, "Stateful Functions Mailbox"),
             getRuntimeContext().getMetricGroup().addGroup("functions"),
             asyncOperationState);
+
+    //
+    // De-multiplex legacy remote function state in versions <= 2.1.x
+    // TODO backwards compatibility path for 2.1.x supported only in 2.2.x, remove for 2.3.x
+    //
+    if (configuration.shouldMigrateLegacyRemoteFnState()) {
+      final DynamicallyRegisteredTypes dynamicallyRegisteredTypes =
+          new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types());
+      RemoteFunctionStateMigrator.apply(
+          statefulFunctionsUniverse.functions(),
+          getKeyedStateBackend(),
+          dynamicallyRegisteredTypes.registerType(String.class),
+          dynamicallyRegisteredTypes.registerType(byte[].class));
+    }
+
     //
     // expire all the pending async operations.
     //
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
new file mode 100644
index 0000000..1b4b33c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RemoteFunctionStateMigrator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.functions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.state.FlinkState;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+/**
+ * Performs state migration for legacy remote function state in StateFun versions <= 2.1.x.
+ *
+ * <p>TODO we plan to remove this backwards compatibility path in version 2.3.0, meaning that TODO
+ * users who want to upgrade from 2.1.x to 2.3.x need to first upgrade to 2.2.x.
+ */
+final class RemoteFunctionStateMigrator
+    implements KeyedStateFunction<String, MapState<String, byte[]>> {
+
+  private static final String LEGACY_MUX_STATE_NAME = "states";
+
+  static void apply(
+      Map<FunctionType, StatefulFunctionProvider> functionProviders,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo)
+      throws Exception {
+    functionProviders.entrySet().stream()
+        .filter(RemoteFunctionStateMigrator::isRemoteFunctionProvider)
+        .forEach(
+            remoteFunctionProvider ->
+                migrateRemoteFunctionState(
+                    remoteFunctionProvider, keyedStateBackend, keyTypeInfo, valueTypeInfo));
+  }
+
+  private static boolean isRemoteFunctionProvider(
+      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry) {
+    return functionProviderEntry.getValue() instanceof HttpFunctionProvider;
+  }
+
+  private static void migrateRemoteFunctionState(
+      Map.Entry<FunctionType, StatefulFunctionProvider> functionProviderEntry,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo) {
+    final FunctionType functionType = functionProviderEntry.getKey();
+    final HttpFunctionSpec functionSpec =
+        ((HttpFunctionProvider) functionProviderEntry.getValue()).getFunctionSpec(functionType);
+
+    try {
+      final RemoteFunctionStateMigrator stateMigrator =
+          new RemoteFunctionStateMigrator(
+              demuxValueStateHandles(
+                  functionSpec.states(), functionType, keyedStateBackend, valueTypeInfo));
+
+      keyedStateBackend.applyToAllKeys(
+          VoidNamespace.INSTANCE,
+          VoidNamespaceSerializer.INSTANCE,
+          multiplexedStateDescriptor(functionType, keyTypeInfo, valueTypeInfo),
+          stateMigrator);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Error migrating multiplexed state for remote function type " + functionType);
+    }
+  }
+
+  /** The value states to de-mux the multiplexed state into. */
+  private final Map<String, ValueState<byte[]>> demuxValueStates;
+
+  private RemoteFunctionStateMigrator(Map<String, ValueState<byte[]>> demuxValueStates) {
+    this.demuxValueStates = Objects.requireNonNull(demuxValueStates);
+  }
+
+  @Override
+  public void process(String key, MapState<String, byte[]> multiplexedState) throws Exception {
+    for (Map.Entry<String, byte[]> entry : multiplexedState.entries()) {
+      final String stateName = entry.getKey();
+      final byte[] value = entry.getValue();
+
+      final ValueState<byte[]> demuxState = demuxValueStates.get(stateName);
+      // drop state if it is no longer registered, otherwise migrate to value state
+      if (demuxState != null) {
+        demuxState.update(value);
+      }
+    }
+    multiplexedState.clear();
+  }
+
+  private static Map<String, ValueState<byte[]>> demuxValueStateHandles(
+      List<StateSpec> stateSpecs,
+      FunctionType functionType,
+      KeyedStateBackend<String> keyedStateBackend,
+      TypeInformation<byte[]> valueTypeInfo)
+      throws Exception {
+    final Map<String, ValueState<byte[]>> valueStates = new HashMap<>(stateSpecs.size());
+    for (StateSpec stateSpec : stateSpecs) {
+      valueStates.put(
+          stateSpec.name(),
+          keyedStateBackend.getOrCreateKeyedState(
+              VoidNamespaceSerializer.INSTANCE,
+              demuxValueStateDescriptor(functionType, stateSpec, valueTypeInfo)));
+    }
+    return valueStates;
+  }
+
+  private static ValueStateDescriptor<byte[]> demuxValueStateDescriptor(
+      FunctionType functionType, StateSpec stateSpec, TypeInformation<byte[]> valueTypeInfo) {
+    return new ValueStateDescriptor<>(
+        FlinkState.flinkStateName(functionType, stateSpec.name()), valueTypeInfo);
+  }
+
+  private static MapStateDescriptor<String, byte[]> multiplexedStateDescriptor(
+      FunctionType functionType,
+      TypeInformation<String> keyTypeInfo,
+      TypeInformation<byte[]> valueTypeInfo) {
+    return new MapStateDescriptor<>(
+        FlinkState.flinkStateName(functionType, LEGACY_MUX_STATE_NAME), keyTypeInfo, valueTypeInfo);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 0bc6de1..e8fc0a4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -50,6 +50,10 @@ public class HttpFunctionProvider implements StatefulFunctionProvider {
         buildHttpClient(spec));
   }
 
+  public HttpFunctionSpec getFunctionSpec(FunctionType type) {
+    return supportedTypes.get(type);
+  }
+
   private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
     OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
     clientBuilder.callTimeout(spec.maxRequestDuration());
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
index b46a3be..10efa59 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
@@ -102,7 +102,7 @@ public final class FlinkState implements State {
     keyedStateBackend.setCurrentKey(KeyBy.apply(address));
   }
 
-  private static String flinkStateName(FunctionType functionType, String name) {
+  public static String flinkStateName(FunctionType functionType, String name) {
     return String.format("%s.%s.%s", functionType.namespace(), functionType.name(), name);
   }
 }