You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/19 15:57:55 UTC

[GitHub] [flink-statefun] sjwiesman opened a new pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

sjwiesman opened a new pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27
 
 
   This PR update the statefun runtime to pull all configurations from the flink-conf. Now, no configurations need to be set from the CLI (which is not easily accessible when using the docker image) and the same global configurations are passed to all calls of `StatefulFunctionModule#configure`.
   
   Specific changes are documented in the commit messages. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381933672
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 
 Review comment:
   In addition, can you please explain why is "../conf" is a fallback?
   I guess it has something do to with maven and `target` ?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382100607
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
 
 Review comment:
   The source of my confusion is the `new Configuration()`,
   it is empty, while we can use the one that you've just obtained from `flink-conf.yaml`.
   The reason I'm asking is that down the line you'll be calling:
   `env.configure(..)` - wouldn't it be better to pass the configuration parsed from flink-conf ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381932931
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 
 Review comment:
   Can all the `flink-conf.yaml` related search and parsing be extract to an external class?
   perhaps ConfUtils or something like that?
   Also it should probably LOG where did it got their configurations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381955581
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -82,14 +85,37 @@ private static void setDefaultContextClassLoaderIfAbsent() {
     }
   }
 
-  private static void setDefaultProviderIfAbsent(
-      Configuration configuration, StatefulFunctionsUniverseProvider provider) {
-    if (!configuration.contains(
-        StatefulFunctionsJobConstants.STATEFUL_FUNCTIONS_UNIVERSE_INITIALIZER_CLASS_BYTES)) {
-      ConfigurationUtil.storeSerializedInstance(
-          configuration,
-          StatefulFunctionsJobConstants.STATEFUL_FUNCTIONS_UNIVERSE_INITIALIZER_CLASS_BYTES,
-          provider);
+  /**
+   * Finds the location of the flink-conf. The fallback keys are required to find the configuration
+   * in non-image based deployments; (i.e., anything using the flink cli).
+   *
+   * <p>Taken from org.apache.flink.client.cli.CliFrontend
+   */
+  private static String getConfigurationDirectoryFromEnv() {
+    String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+
+    if (location != null) {
+      if (new File(location).exists()) {
+        return location;
+      } else {
+        throw new RuntimeException(
+            "The configuration directory '"
+                + location
+                + "', specified in the '"
+                + ConfigConstants.ENV_FLINK_CONF_DIR
+                + "' environment variable, does not exist.");
+      }
+    } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
 
 Review comment:
   I think that it would be better if you'll add here a log message, that says which configuration value was selected,
   which fallback or environment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381927045
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 ##########
 @@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+  // This configuration option exists for the documentation generator
+  @SuppressWarnings("unused")
+  public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+      ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              Description.builder()
+                  .text(
+                      "Adds the given key/value pair to the Stateful Functions global configuration.")
+                  .text(
+                      "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+                  .linebreak()
+                  .text(
+                      "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+                  .list(
+                      code(MODULE_CONFIG_PREFIX + "key1: value1"),
+                      code(MODULE_CONFIG_PREFIX + "key2: value2"))
+                  .text("are set, then the map")
+                  .list(code("key1: value1"), code("key2: value2"))
+                  .text("will be made available to your module at runtime.")
+                  .build());
+
+  public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+      ConfigOptions.key("statefun.message.serializer")
 
 Review comment:
   I guess we don't need the `statefun` prefix now, since we have the `statefun.module.config.`
   
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382095506
 
 

 ##########
 File path: statefun-examples/statefun-async-example/src/test/java/org/apache/flink/statefun/examples/async/RunnerTest.java
 ##########
 @@ -34,7 +34,6 @@
   public void run() throws Exception {
     Harness harness =
         new Harness()
-            .noCheckpointing()
 
 Review comment:
   Yes, checkpoints are now disabled by default since I updated us to use 1.10 checkpoint configurations. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382093950
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
   }
 
-  public static void main(Configuration configuration) throws Exception {
-    Objects.requireNonNull(configuration);
+  public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+      throws Exception {
+    Objects.requireNonNull(stateFunConfig);
+    Objects.requireNonNull(flinkConf);
 
     setDefaultContextClassLoaderIfAbsent();
-    setDefaultProviderIfAbsent(
-        configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+    env.getConfig().enableObjectReuse();
 
     final StatefulFunctionsUniverse statefulFunctionsUniverse =
         StatefulFunctionsUniverses.get(
-            Thread.currentThread().getContextClassLoader(), configuration);
+            Thread.currentThread().getContextClassLoader(), stateFunConfig);
 
     final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
         new StatefulFunctionsUniverseValidator();
     statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
 
-    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    setDefaultConfiguration(configuration, env);
-
-    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
     flinkUniverse.configure(env);
 
-    String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
-    env.execute(jobName);
-  }
-
-  private static void setDefaultConfiguration(
-      Configuration configuration, StreamExecutionEnvironment env) {
-    env.getConfig().setGlobalJobParameters(configuration);
-    env.getConfig().enableObjectReuse();
-    final long checkpointingInterval =
-        configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
-    if (checkpointingInterval > 0) {
-      env.enableCheckpointing(checkpointingInterval);
 
 Review comment:
   As of 1.10 Flink supports configuring checkpoints through the flink-conf[1]. We no longer need any checkpointing related configs in statefun.
   
   [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#checkpointing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381936426
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
 
 Review comment:
   What is the use of `new Configuration()`?
   Wouldn't it be more correct to pass the already parsed `Configuration` object that represents the local `flink-conf.yaml` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382136882
 
 

 ##########
 File path: statefun-examples/statefun-flink-harness-example/src/test/java/org/apache/flink/statefun/examples/harness/RunnerTest.java
 ##########
 @@ -33,7 +33,6 @@
   public void run() throws Exception {
     Harness harness =
         new Harness()
-            .noCheckpointing()
 
 Review comment:
   yes, see above. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381951960
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
   }
 
-  public static void main(Configuration configuration) throws Exception {
-    Objects.requireNonNull(configuration);
+  public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+      throws Exception {
+    Objects.requireNonNull(stateFunConfig);
+    Objects.requireNonNull(flinkConf);
 
     setDefaultContextClassLoaderIfAbsent();
-    setDefaultProviderIfAbsent(
-        configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+    env.getConfig().enableObjectReuse();
 
     final StatefulFunctionsUniverse statefulFunctionsUniverse =
         StatefulFunctionsUniverses.get(
-            Thread.currentThread().getContextClassLoader(), configuration);
+            Thread.currentThread().getContextClassLoader(), stateFunConfig);
 
     final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
         new StatefulFunctionsUniverseValidator();
     statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
 
-    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    setDefaultConfiguration(configuration, env);
-
-    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
     flinkUniverse.configure(env);
 
-    String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
-    env.execute(jobName);
-  }
-
-  private static void setDefaultConfiguration(
-      Configuration configuration, StreamExecutionEnvironment env) {
-    env.getConfig().setGlobalJobParameters(configuration);
-    env.getConfig().enableObjectReuse();
-    final long checkpointingInterval =
-        configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
-    if (checkpointingInterval > 0) {
-      env.enableCheckpointing(checkpointingInterval);
 
 Review comment:
   How would the user set the checkpointing interval now?
   Is there a Flink specific key that users can put in `flink-conf.yaml` ?
   if so, then I would re-emphasize the need to actually pass here the already parsed `flink-conf.yaml`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382092653
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
 
 Review comment:
   The parameter needs a better name and maybe some javadoc :) 
   
   This config is for any options you want to add to the flink-conf. It is so Harness based deployments can add extra flink configurations. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589665067
 
 
   Example logs:
   ```
   Starting statefun as a console application on host 51250dbef069.
   2020-02-21 13:38:34,639 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
   2020-02-21 13:38:34,642 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StatefulFunctionsClusterEntryPoint (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 19:18:19 CET)
   2020-02-21 13:38:34,642 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: root
   2020-02-21 13:38:34,643 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
   2020-02-21 13:38:34,644 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.242-b08
   2020-02-21 13:38:34,644 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 1324 MiBytes
   2020-02-21 13:38:34,644 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /usr/local/openjdk-8
   2020-02-21 13:38:34,645 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
   2020-02-21 13:38:34,645 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
   2020-02-21 13:38:34,645 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
   2020-02-21 13:38:34,645 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
   2020-02-21 13:38:34,646 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
   2020-02-21 13:38:34,646 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
   2020-02-21 13:38:34,646 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink/conf
   2020-02-21 13:38:34,646 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Djobmanager.rpc.address=master
   2020-02-21 13:38:34,647 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/statefun-flink-core.jar:/opt/flink/lib/statefun-flink-distribution.jar:/opt/flink/lib/flink-dist_2.12-1.10.0.jar:::
   2020-02-21 13:38:34,647 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
   2020-02-21 13:38:34,650 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
   2020-02-21 13:38:35,008 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: classloader.parent-first-patterns.additional, org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
   2020-02-21 13:38:35,008 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
   2020-02-21 13:38:35,008 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB
   2020-02-21 13:38:35,009 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///checkpoint-dir
   2020-02-21 13:38:35,009 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.incremental, true
   2020-02-21 13:38:35,010 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.process.size, 4g
   2020-02-21 13:38:35,118 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StatefulFunctionsClusterEntryPoint.
   2020-02-21 13:38:35,119 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
   2020-02-21 13:38:35,271 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
   2020-02-21 13:38:35,315 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
   2020-02-21 13:38:35,341 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
   2020-02-21 13:38:35,372 INFO  org.apache.flink.runtime.security.modules.JaasModule          - Jaas file will be created as /tmp/jaas-3284484390599399241.conf.
   2020-02-21 13:38:35,383 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
   2020-02-21 13:38:35,386 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
   2020-02-21 13:38:35,450 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at master:6123
   2020-02-21 13:38:37,254 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
   2020-02-21 13:38:37,336 INFO  akka.remote.Remoting                                          - Starting remoting
   2020-02-21 13:38:37,827 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@master:6123]
   2020-02-21 13:38:38,038 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink@master:6123
   2020-02-21 13:38:38,084 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
   2020-02-21 13:38:38,106 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-37a545e1-7157-4a42-82ea-b7aad4a5dc2d
   2020-02-21 13:38:38,114 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:37805 - max concurrent requests: 50 - max backlog: 1000
   2020-02-21 13:38:38,163 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
   2020-02-21 13:38:38,172 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at master:0
   2020-02-21 13:38:38,235 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
   2020-02-21 13:38:38,242 INFO  akka.remote.Remoting                                          - Starting remoting
   2020-02-21 13:38:38,264 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink-metrics@master:33897]
   2020-02-21 13:38:38,272 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink-metrics@master:33897
   2020-02-21 13:38:38,288 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
   2020-02-21 13:38:38,519 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
   2020-02-21 13:38:38,522 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Upload directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-upload does not exist.
   2020-02-21 13:38:38,523 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Created directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-upload for file uploads.
   2020-02-21 13:38:38,620 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Starting rest endpoint.
   2020-02-21 13:38:39,184 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
   2020-02-21 13:38:39,187 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
   2020-02-21 13:38:39,719 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Rest endpoint listening at master:8081
   2020-02-21 13:38:39,723 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - http://master:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
   2020-02-21 13:38:39,723 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Web frontend listening at http://master:8081.
   2020-02-21 13:38:39,777 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
   2020-02-21 13:38:40,012 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Shutting down rest endpoint.
   2020-02-21 13:38:40,216 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Removing cache directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-ui
   2020-02-21 13:38:40,229 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - http://master:8081 lost leadership
   2020-02-21 13:38:40,229 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Shut down complete.
   2020-02-21 13:38:40,237 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting StatefulFunctionsClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
       at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
       at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:94)
   Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
       at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
       at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
       at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196)
       ... 6 more
   Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:96)
       at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
       ... 8 more
   Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to acquire the Flink configuration from the current environment
       at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
       at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
       at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
       at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
       at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:90)
       ... 9 more
   Caused by: java.lang.RuntimeException: Failed to acquire the Flink configuration from the current environment
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:98)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:87)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:36)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
       ... 14 more
   Caused by: java.lang.NoSuchMethodException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getConfiguration()
       at java.lang.Class.getMethod(Class.java:1786)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:94)
       ... 21 more
   .
   2020-02-21 13:38:40,240 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:37805
   2020-02-21 13:38:40,242 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
   2020-02-21 13:38:40,247 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
   2020-02-21 13:38:40,286 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
   2020-02-21 13:38:40,293 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
   2020-02-21 13:38:40,336 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
   2020-02-21 13:38:40,345 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
   2020-02-21 13:38:40,354 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
   2020-02-21 13:38:40,358 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
   2020-02-21 13:38:40,412 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
   2020-02-21 13:38:40,417 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
   2020-02-21 13:38:40,423 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint StatefulFunctionsClusterEntryPoint.
   org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StatefulFunctionsClusterEntryPoint.
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:94)
   Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
       at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
       at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
       at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
       ... 2 more
   Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
       at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
       at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
       at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196)
       ... 6 more
   Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:96)
       at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
       ... 8 more
   Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to acquire the Flink configuration from the current environment
       at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
       at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
       at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
       at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
       at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
       at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:90)
       ... 9 more
   Caused by: java.lang.RuntimeException: Failed to acquire the Flink configuration from the current environment
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:98)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:87)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:36)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
       ... 14 more
   Caused by: java.lang.NoSuchMethodException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getConfiguration()
       at java.lang.Class.getMethod(Class.java:1786)
       at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:94)
       ... 21 more
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589215387
 
 
   @igalshilman and I discussed this offline. 
   
   Instead of discovering the flink-conf via `GlobalConfigurations` we are accessing it from `StreamExecutionEnvironment#getConfiguration`. This does require reflection as the method is private, but we are guaranteed to successfully access the config regardless of deployment method (image, standalone, session cluster, etc). Because we only use the config object returned in a read-only manner, we deem this safe. 
   
   The call to `StreamExecutionEnvironment#configure` allows us to modify the configuration values at runtime. In doing so we can expose all configurations to Harness users via `Harness#withConfiguration`. This is the same pattern used by the TableConfig in the table api to expose configurations without exposing the underlying stream execution environment. Only keys that are set get overridden. If a key is not set then its existing values remain. This method is public. 
   
   I've updated the PR based on this discussion and the other misc comment on this thread. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382139721
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 ##########
 @@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+  // This configuration option exists for the documentation generator
+  @SuppressWarnings("unused")
+  public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+      ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              Description.builder()
+                  .text(
+                      "Adds the given key/value pair to the Stateful Functions global configuration.")
+                  .text(
+                      "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+                  .linebreak()
+                  .text(
+                      "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+                  .list(
+                      code(MODULE_CONFIG_PREFIX + "key1: value1"),
+                      code(MODULE_CONFIG_PREFIX + "key2: value2"))
+                  .text("are set, then the map")
+                  .list(code("key1: value1"), code("key2: value2"))
+                  .text("will be made available to your module at runtime.")
+                  .build());
+
+  public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+      ConfigOptions.key("statefun.message.serializer")
 
 Review comment:
   No, the prefix is only for specified `globalConfigurations` passed into the module. Before the `StatefulFunctionsModule#configure` was getting the entire flink conf. That is dangerous as it exposes way to many internal details and potentially breaking changes since lots of internal configurations are added between the dispatcher and the TM where we were previously pulling from. 
   
   What I updated this to do was only pass through flink-confs with the prefix `statefun.module.config`. I'm going to update it to be `statefun.module.global-config` instead. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381932004
 
 

 ##########
 File path: statefun-examples/statefun-flink-harness-example/src/test/java/org/apache/flink/statefun/examples/harness/RunnerTest.java
 ##########
 @@ -33,7 +33,6 @@
   public void run() throws Exception {
     Harness harness =
         new Harness()
-            .noCheckpointing()
 
 Review comment:
   Is this on purpose?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-588469807
 
 
   I ran the greeter example to validate the code, I'm not really sure how else to test this but I assume we'll make use of this in e2e tests whenever those are added. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382603325
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 ##########
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
+
+  // This configuration option exists for the documentation generator
+  @SuppressWarnings("unused")
+  public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+      ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              Description.builder()
+                  .text(
+                      "Adds the given key/value pair to the Stateful Functions global configuration.")
+                  .text(
+                      "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+                  .linebreak()
+                  .text(
+                      "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+                  .list(
+                      code(MODULE_CONFIG_PREFIX + "key1: value1"),
+                      code(MODULE_CONFIG_PREFIX + "key2: value2"))
+                  .text("are set, then the map")
+                  .list(code("key1: value1"), code("key2: value2"))
+                  .text("will be made available to your module at runtime.")
+                  .build());
+
+  public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+      ConfigOptions.key("statefun.message.serializer")
+          .enumType(MessageFactoryType.class)
+          .defaultValue(MessageFactoryType.WITH_PROTOBUF_PAYLOADS)
+          .withDescription("The serializer to use for on the wire messages.");
+
+  public static final ConfigOption<String> FLINK_JOB_NAME =
+      ConfigOptions.key("statefun.flink-job-name")
+          .stringType()
+          .defaultValue("StatefulFunctions")
+          .withDescription("The name to display at the Flink-UI");
+
+  /**
+   * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
+   * current environment set via the {@code flink-conf.yaml}.
+   */
+  public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
+    Configuration configuration = getConfiguration(env);
+    return new StatefulFunctionsConfig(configuration);
+  }
+
+  @SuppressWarnings("JavaReflectionMemberAccess")
+  private static Configuration getConfiguration(StreamExecutionEnvironment env) {
+    try {
+      Method getConfiguration = StreamExecutionEnvironment.class.getMethod("getConfiguration");
 
 Review comment:
   I think the problem is here: should be `getDeclaredMethod`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381931926
 
 

 ##########
 File path: statefun-examples/statefun-async-example/src/test/java/org/apache/flink/statefun/examples/async/RunnerTest.java
 ##########
 @@ -34,7 +34,6 @@
   public void run() throws Exception {
     Harness harness =
         new Harness()
-            .noCheckpointing()
 
 Review comment:
   Is this on purpose? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382140387
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 ##########
 @@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+  // This configuration option exists for the documentation generator
+  @SuppressWarnings("unused")
+  public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+      ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              Description.builder()
+                  .text(
+                      "Adds the given key/value pair to the Stateful Functions global configuration.")
+                  .text(
+                      "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+                  .linebreak()
+                  .text(
+                      "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+                  .list(
+                      code(MODULE_CONFIG_PREFIX + "key1: value1"),
+                      code(MODULE_CONFIG_PREFIX + "key2: value2"))
+                  .text("are set, then the map")
+                  .list(code("key1: value1"), code("key2: value2"))
+                  .text("will be made available to your module at runtime.")
+                  .build());
+
+  public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+      ConfigOptions.key("statefun.message.serializer")
 
 Review comment:
   Take a look at `StatefulFunctionsConfigTest` and my doc updates

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589663623
 
 
   I can confirm that there's an issue with the `StreamExecutionEnvironment#getConfiguration` reflection - `NoSuchMethodError` is being thrown for both a manual run of the greeter example, or using the e2e in #28.
   
   Currently looking into it. From the logs it's confirmed that the correct Flink version (1.10.0) is being used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589676426
 
 
   Can confirm it is fixed after changing to `getDeclaredMethod` 😄 
   Both the e2e in #28 and manual greeter example is working.
   
   +1, merging this. Thanks @sjwiesman @igalshilman!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381935261
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
 
 Review comment:
   I would maybe call that method `mergeGlobalConfigurationsWith(..`
   or any other name that means that it is actually a union between what is already parsed and the argument.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382100974
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 ##########
 @@ -17,59 +17,62 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+@SuppressWarnings("JavaReflectionMemberAccess")
 public class StatefulFunctionsJob {
 
+  private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+  private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
-    Configuration configuration = parameterTool.getConfiguration();
+    Map<String, String> globalConfigurations = parameterTool.toMap();
+
+    String configDirectory = getConfigurationDirectoryFromEnv();
+    Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+    stateFunConfig.setGlobalConfigurations(globalConfigurations);
+    stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
 
-    main(configuration);
+    main(stateFunConfig, new Configuration());
   }
 
-  public static void main(Configuration configuration) throws Exception {
-    Objects.requireNonNull(configuration);
+  public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+      throws Exception {
+    Objects.requireNonNull(stateFunConfig);
+    Objects.requireNonNull(flinkConf);
 
     setDefaultContextClassLoaderIfAbsent();
-    setDefaultProviderIfAbsent(
-        configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+    env.getConfig().enableObjectReuse();
 
     final StatefulFunctionsUniverse statefulFunctionsUniverse =
         StatefulFunctionsUniverses.get(
-            Thread.currentThread().getContextClassLoader(), configuration);
+            Thread.currentThread().getContextClassLoader(), stateFunConfig);
 
     final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
         new StatefulFunctionsUniverseValidator();
     statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
 
-    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    setDefaultConfiguration(configuration, env);
-
-    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
     flinkUniverse.configure(env);
 
-    String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
-    env.execute(jobName);
-  }
-
-  private static void setDefaultConfiguration(
-      Configuration configuration, StreamExecutionEnvironment env) {
-    env.getConfig().setGlobalJobParameters(configuration);
-    env.getConfig().enableObjectReuse();
-    final long checkpointingInterval =
-        configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
-    if (checkpointingInterval > 0) {
-      env.enableCheckpointing(checkpointingInterval);
 
 Review comment:
   That is great, thanks for clarifying.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] sjwiesman edited a comment on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
sjwiesman edited a comment on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589215387
 
 
   @igalshilman and I discussed this offline. 
   
   Instead of discovering the flink-conf via `GlobalConfigurations` we are accessing it from `StreamExecutionEnvironment#getConfiguration`. This does require reflection as the method is protected, but we are guaranteed to successfully access the config regardless of deployment method (image, standalone, session cluster, etc). Because we only use the config object returned in a read-only manner, we deem this safe. 
   
   The call to `StreamExecutionEnvironment#configure` allows us to modify the configuration values at runtime. In doing so we can expose all configurations to Harness users via `Harness#withConfiguration`. This is the same pattern used by the TableConfig in the table api to expose configurations without exposing the underlying stream execution environment. Only keys that are set get overridden. If a key is not set then its existing values remain. This method is public. 
   
   I've updated the PR based on this discussion and the other misc comment on this thread. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai closed pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381956919
 
 

 ##########
 File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
 ##########
 @@ -51,15 +49,19 @@
   // -- configuration
   private final Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs;
 
+  private final StatefulFunctionsConfig configuration;
+
   // -- runtime
   private transient Reductions reductions;
   private transient MailboxExecutor mailboxExecutor;
 
   FunctionGroupOperator(
       Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
+      StatefulFunctionsConfig configuration,
       MailboxExecutor mailboxExecutor,
       ChainingStrategy chainingStrategy) {
     this.sideOutputs = Objects.requireNonNull(sideOutputs);
+    this.configuration = configuration;
 
 Review comment:
   can you add `Object.requireNonNull`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink-statefun] tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration

Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589583732
 
 
   Thanks for the work and reviews @sjwiesman @igalshilman.
   
   Just a quick update from my side:
   I've been trying to verify the changes here, and am seeing test failures with the end-to-end test introduced in #28. Currently trying to check if its just something funky with that new end-to-end test and not relevant for the changes in this PR.
   
   Otherwise, I had a look over the changes and they look good to me!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services