You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/21 15:55:37 UTC

[flink-statefun] 01/03: [FLINK-16149] Move feedback buffer size config to StatefulFunctionsConfig

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6f6f511d27343faf785d423a0f536ec5bc5a8865
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Feb 21 23:22:23 2020 +0800

    [FLINK-16149] Move feedback buffer size config to StatefulFunctionsConfig
---
 .../flink/core/StatefulFunctionsConfig.java        | 21 ++++++++++++++
 .../flink/core/feedback/FeedbackConfiguration.java | 33 ----------------------
 .../feedback/FeedbackUnionOperatorFactory.java     | 13 ++++-----
 .../flink/core/translation/FlinkUniverse.java      |  2 +-
 .../flink/core/StatefulFunctionsConfigTest.java    |  5 ++++
 5 files changed, 33 insertions(+), 41 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 0f3358b..04c2c99 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
@@ -79,6 +80,13 @@ public class StatefulFunctionsConfig implements Serializable {
           .defaultValue("StatefulFunctions")
           .withDescription("The name to display at the Flink-UI");
 
+  public static final ConfigOption<MemorySize> TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING =
+      ConfigOptions.key("statefun.feedback.memory.size")
+          .memoryType()
+          .defaultValue(MemorySize.ofMebiBytes(32))
+          .withDescription(
+              "The number of bytes to use for in memory buffering of the feedback channel, before spilling to disk.");
+
   /**
    * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
    * current environment set via the {@code flink-conf.yaml}.
@@ -107,6 +115,8 @@ public class StatefulFunctionsConfig implements Serializable {
 
   private byte[] universeInitializerClassBytes;
 
+  private MemorySize feedbackBufferSize;
+
   private Map<String, String> globalConfigurations = new HashMap<>();
 
   /**
@@ -117,6 +127,7 @@ public class StatefulFunctionsConfig implements Serializable {
   public StatefulFunctionsConfig(Configuration configuration) {
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
+    this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
 
     for (String key : configuration.keySet()) {
       if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -152,6 +163,16 @@ public class StatefulFunctionsConfig implements Serializable {
     this.flinkJobName = Objects.requireNonNull(flinkJobName);
   }
 
+  /** Returns the number of bytes to use for in memory buffering of the feedback channel. */
+  public MemorySize getFeedbackBufferSize() {
+    return feedbackBufferSize;
+  }
+
+  /** Sets the number of bytes to use for in memory buffering of the feedback channel. */
+  public void setFeedbackBufferSize(MemorySize size) {
+    this.feedbackBufferSize = Objects.requireNonNull(size);
+  }
+
   /**
    * Retrieves the universe provider for loading modules.
    *
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackConfiguration.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackConfiguration.java
deleted file mode 100644
index 6ec9475..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.feedback;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.MemorySize;
-
-public final class FeedbackConfiguration {
-  private FeedbackConfiguration() {}
-
-  public static final ConfigOption<MemorySize> TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING =
-      ConfigOptions.key("statefun.feedback.memory.size")
-          .memoryType()
-          .defaultValue(MemorySize.ofMebiBytes(32))
-          .withDescription(
-              "The number of bytes to use for in memory buffering of the feedback channel, before spilling to disk");
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
index 67670a9..adf693f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
@@ -19,7 +19,7 @@ package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
 import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -32,6 +32,8 @@ public final class FeedbackUnionOperatorFactory<E>
 
   private static final long serialVersionUID = 1;
 
+  private final StatefulFunctionsConfig configuration;
+
   private final FeedbackKey<E> feedbackKey;
   private final SerializablePredicate<E> isBarrierMessage;
   private final SerializableFunction<E, ?> keySelector;
@@ -40,12 +42,14 @@ public final class FeedbackUnionOperatorFactory<E>
   private transient ChainingStrategy chainingStrategy;
 
   public FeedbackUnionOperatorFactory(
+      StatefulFunctionsConfig configuration,
       FeedbackKey<E> feedbackKey,
       SerializablePredicate<E> isBarrierMessage,
       SerializableFunction<E, ?> keySelector) {
     this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.isBarrierMessage = Objects.requireNonNull(isBarrierMessage);
     this.keySelector = Objects.requireNonNull(keySelector);
+    this.configuration = Objects.requireNonNull(configuration);
   }
 
   @Override
@@ -55,17 +59,12 @@ public final class FeedbackUnionOperatorFactory<E>
     final TypeSerializer<E> serializer =
         config.getTypeSerializerIn1(containingTask.getUserCodeClassLoader());
 
-    final MemorySize totalMemoryUsedForFeedbackCheckpointing =
-        config
-            .getConfiguration()
-            .get(FeedbackConfiguration.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
-
     FeedbackUnionOperator<E> op =
         new FeedbackUnionOperator<>(
             feedbackKey,
             isBarrierMessage,
             keySelector,
-            totalMemoryUsedForFeedbackCheckpointing.getBytes(),
+            configuration.getFeedbackBufferSize().getBytes(),
             serializer,
             mailboxExecutor);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index acfeab6..ff8df49 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -75,7 +75,7 @@ public final class FlinkUniverse {
 
     FeedbackUnionOperatorFactory<Message> factory =
         new FeedbackUnionOperatorFactory<>(
-            FEEDBACK_KEY, new IsCheckpointBarrier(), new FeedbackKeySelector());
+            configuration, FEEDBACK_KEY, new IsCheckpointBarrier(), new FeedbackKeySelector());
 
     return input
         .keyBy(new MessageKeySelector())
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index c5c8b54..540739e 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -33,6 +34,9 @@ public class StatefulFunctionsConfigTest {
     configuration.set(StatefulFunctionsConfig.FLINK_JOB_NAME, testName);
     configuration.set(
         StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
+    configuration.set(
+        StatefulFunctionsConfig.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING,
+        MemorySize.ofMebiBytes(100));
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
 
@@ -40,6 +44,7 @@ public class StatefulFunctionsConfigTest {
 
     Assert.assertEquals(stateFunConfig.getFlinkJobName(), testName);
     Assert.assertEquals(stateFunConfig.getFactoryType(), MessageFactoryType.WITH_KRYO_PAYLOADS);
+    Assert.assertEquals(stateFunConfig.getFeedbackBufferSize(), MemorySize.ofMebiBytes(100));
     Assert.assertThat(
         stateFunConfig.getGlobalConfigurations(), Matchers.hasEntry("key1", "value1"));
     Assert.assertThat(