You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/19 15:57:55 UTC
[GitHub] [flink-statefun] sjwiesman opened a new pull request #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
sjwiesman opened a new pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27
This PR update the statefun runtime to pull all configurations from the flink-conf. Now, no configurations need to be set from the CLI (which is not easily accessible when using the docker image) and the same global configurations are passed to all calls of `StatefulFunctionModule#configure`.
Specific changes are documented in the commit messages.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381933672
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
Review comment:
In addition, can you please explain why is "../conf" is a fallback?
I guess it has something do to with maven and `target` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382100607
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
Review comment:
The source of my confusion is the `new Configuration()`,
it is empty, while we can use the one that you've just obtained from `flink-conf.yaml`.
The reason I'm asking is that down the line you'll be calling:
`env.configure(..)` - wouldn't it be better to pass the configuration parsed from flink-conf ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381932931
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
Review comment:
Can all the `flink-conf.yaml` related search and parsing be extract to an external class?
perhaps ConfUtils or something like that?
Also it should probably LOG where did it got their configurations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381955581
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -82,14 +85,37 @@ private static void setDefaultContextClassLoaderIfAbsent() {
}
}
- private static void setDefaultProviderIfAbsent(
- Configuration configuration, StatefulFunctionsUniverseProvider provider) {
- if (!configuration.contains(
- StatefulFunctionsJobConstants.STATEFUL_FUNCTIONS_UNIVERSE_INITIALIZER_CLASS_BYTES)) {
- ConfigurationUtil.storeSerializedInstance(
- configuration,
- StatefulFunctionsJobConstants.STATEFUL_FUNCTIONS_UNIVERSE_INITIALIZER_CLASS_BYTES,
- provider);
+ /**
+ * Finds the location of the flink-conf. The fallback keys are required to find the configuration
+ * in non-image based deployments; (i.e., anything using the flink cli).
+ *
+ * <p>Taken from org.apache.flink.client.cli.CliFrontend
+ */
+ private static String getConfigurationDirectoryFromEnv() {
+ String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+
+ if (location != null) {
+ if (new File(location).exists()) {
+ return location;
+ } else {
+ throw new RuntimeException(
+ "The configuration directory '"
+ + location
+ + "', specified in the '"
+ + ConfigConstants.ENV_FLINK_CONF_DIR
+ + "' environment variable, does not exist.");
+ }
+ } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
Review comment:
I think that it would be better if you'll add here a log message, that says which configuration value was selected,
which fallback or environment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381927045
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+ // This configuration option exists for the documentation generator
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+ ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Adds the given key/value pair to the Stateful Functions global configuration.")
+ .text(
+ "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+ .linebreak()
+ .text(
+ "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+ .list(
+ code(MODULE_CONFIG_PREFIX + "key1: value1"),
+ code(MODULE_CONFIG_PREFIX + "key2: value2"))
+ .text("are set, then the map")
+ .list(code("key1: value1"), code("key2: value2"))
+ .text("will be made available to your module at runtime.")
+ .build());
+
+ public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+ ConfigOptions.key("statefun.message.serializer")
Review comment:
I guess we don't need the `statefun` prefix now, since we have the `statefun.module.config.`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382095506
##########
File path: statefun-examples/statefun-async-example/src/test/java/org/apache/flink/statefun/examples/async/RunnerTest.java
##########
@@ -34,7 +34,6 @@
public void run() throws Exception {
Harness harness =
new Harness()
- .noCheckpointing()
Review comment:
Yes, checkpoints are now disabled by default since I updated us to use 1.10 checkpoint configurations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382093950
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
}
- public static void main(Configuration configuration) throws Exception {
- Objects.requireNonNull(configuration);
+ public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+ throws Exception {
+ Objects.requireNonNull(stateFunConfig);
+ Objects.requireNonNull(flinkConf);
setDefaultContextClassLoaderIfAbsent();
- setDefaultProviderIfAbsent(
- configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+ env.getConfig().enableObjectReuse();
final StatefulFunctionsUniverse statefulFunctionsUniverse =
StatefulFunctionsUniverses.get(
- Thread.currentThread().getContextClassLoader(), configuration);
+ Thread.currentThread().getContextClassLoader(), stateFunConfig);
final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
new StatefulFunctionsUniverseValidator();
statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- setDefaultConfiguration(configuration, env);
-
- FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+ FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
flinkUniverse.configure(env);
- String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
- env.execute(jobName);
- }
-
- private static void setDefaultConfiguration(
- Configuration configuration, StreamExecutionEnvironment env) {
- env.getConfig().setGlobalJobParameters(configuration);
- env.getConfig().enableObjectReuse();
- final long checkpointingInterval =
- configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
- if (checkpointingInterval > 0) {
- env.enableCheckpointing(checkpointingInterval);
Review comment:
As of 1.10 Flink supports configuring checkpoints through the flink-conf[1]. We no longer need any checkpointing related configs in statefun.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#checkpointing
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381936426
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
Review comment:
What is the use of `new Configuration()`?
Wouldn't it be more correct to pass the already parsed `Configuration` object that represents the local `flink-conf.yaml` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382136882
##########
File path: statefun-examples/statefun-flink-harness-example/src/test/java/org/apache/flink/statefun/examples/harness/RunnerTest.java
##########
@@ -33,7 +33,6 @@
public void run() throws Exception {
Harness harness =
new Harness()
- .noCheckpointing()
Review comment:
yes, see above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381951960
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
}
- public static void main(Configuration configuration) throws Exception {
- Objects.requireNonNull(configuration);
+ public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+ throws Exception {
+ Objects.requireNonNull(stateFunConfig);
+ Objects.requireNonNull(flinkConf);
setDefaultContextClassLoaderIfAbsent();
- setDefaultProviderIfAbsent(
- configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+ env.getConfig().enableObjectReuse();
final StatefulFunctionsUniverse statefulFunctionsUniverse =
StatefulFunctionsUniverses.get(
- Thread.currentThread().getContextClassLoader(), configuration);
+ Thread.currentThread().getContextClassLoader(), stateFunConfig);
final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
new StatefulFunctionsUniverseValidator();
statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- setDefaultConfiguration(configuration, env);
-
- FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+ FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
flinkUniverse.configure(env);
- String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
- env.execute(jobName);
- }
-
- private static void setDefaultConfiguration(
- Configuration configuration, StreamExecutionEnvironment env) {
- env.getConfig().setGlobalJobParameters(configuration);
- env.getConfig().enableObjectReuse();
- final long checkpointingInterval =
- configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
- if (checkpointingInterval > 0) {
- env.enableCheckpointing(checkpointingInterval);
Review comment:
How would the user set the checkpointing interval now?
Is there a Flink specific key that users can put in `flink-conf.yaml` ?
if so, then I would re-emphasize the need to actually pass here the already parsed `flink-conf.yaml`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382092653
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
Review comment:
The parameter needs a better name and maybe some javadoc :)
This config is for any options you want to add to the flink-conf. It is so Harness based deployments can add extra flink configurations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589665067
Example logs:
```
Starting statefun as a console application on host 51250dbef069.
2020-02-21 13:38:34,639 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --------------------------------------------------------------------------------
2020-02-21 13:38:34,642 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting StatefulFunctionsClusterEntryPoint (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 19:18:19 CET)
2020-02-21 13:38:34,642 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user: root
2020-02-21 13:38:34,643 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current Hadoop/Kerberos user: <no hadoop dependency found>
2020-02-21 13:38:34,644 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.242-b08
2020-02-21 13:38:34,644 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size: 1324 MiBytes
2020-02-21 13:38:34,644 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME: /usr/local/openjdk-8
2020-02-21 13:38:34,645 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - No Hadoop Dependency available
2020-02-21 13:38:34,645 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
2020-02-21 13:38:34,645 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2020-02-21 13:38:34,645 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2020-02-21 13:38:34,646 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
2020-02-21 13:38:34,646 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2020-02-21 13:38:34,646 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - /opt/flink/conf
2020-02-21 13:38:34,646 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Djobmanager.rpc.address=master
2020-02-21 13:38:34,647 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath: /opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/statefun-flink-core.jar:/opt/flink/lib/statefun-flink-distribution.jar:/opt/flink/lib/flink-dist_2.12-1.10.0.jar:::
2020-02-21 13:38:34,647 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --------------------------------------------------------------------------------
2020-02-21 13:38:34,650 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX signal handlers for [TERM, HUP, INT]
2020-02-21 13:38:35,008 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: classloader.parent-first-patterns.additional, org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
2020-02-21 13:38:35,008 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.backend, rocksdb
2020-02-21 13:38:35,008 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.backend.rocksdb.timer-service.factory, ROCKSDB
2020-02-21 13:38:35,009 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.checkpoints.dir, file:///checkpoint-dir
2020-02-21 13:38:35,009 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.backend.incremental, true
2020-02-21 13:38:35,010 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.process.size, 4g
2020-02-21 13:38:35,118 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting StatefulFunctionsClusterEntryPoint.
2020-02-21 13:38:35,119 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Install default filesystem.
2020-02-21 13:38:35,271 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-02-21 13:38:35,315 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Install security context.
2020-02-21 13:38:35,341 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-02-21 13:38:35,372 INFO org.apache.flink.runtime.security.modules.JaasModule - Jaas file will be created as /tmp/jaas-3284484390599399241.conf.
2020-02-21 13:38:35,383 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-02-21 13:38:35,386 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Initializing cluster services.
2020-02-21 13:38:35,450 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at master:6123
2020-02-21 13:38:37,254 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
2020-02-21 13:38:37,336 INFO akka.remote.Remoting - Starting remoting
2020-02-21 13:38:37,827 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@master:6123]
2020-02-21 13:38:38,038 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@master:6123
2020-02-21 13:38:38,084 INFO org.apache.flink.configuration.Configuration - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-02-21 13:38:38,106 INFO org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage directory /tmp/blobStore-37a545e1-7157-4a42-82ea-b7aad4a5dc2d
2020-02-21 13:38:38,114 INFO org.apache.flink.runtime.blob.BlobServer - Started BLOB server at 0.0.0.0:37805 - max concurrent requests: 50 - max backlog: 1000
2020-02-21 13:38:38,163 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter configured, no metrics will be exposed/reported.
2020-02-21 13:38:38,172 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at master:0
2020-02-21 13:38:38,235 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
2020-02-21 13:38:38,242 INFO akka.remote.Remoting - Starting remoting
2020-02-21 13:38:38,264 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink-metrics@master:33897]
2020-02-21 13:38:38,272 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink-metrics@master:33897
2020-02-21 13:38:38,288 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
2020-02-21 13:38:38,519 INFO org.apache.flink.configuration.Configuration - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-02-21 13:38:38,522 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Upload directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-upload does not exist.
2020-02-21 13:38:38,523 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Created directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-upload for file uploads.
2020-02-21 13:38:38,620 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Starting rest endpoint.
2020-02-21 13:38:39,184 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set.
2020-02-21 13:38:39,187 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
2020-02-21 13:38:39,719 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Rest endpoint listening at master:8081
2020-02-21 13:38:39,723 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - http://master:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2020-02-21 13:38:39,723 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Web frontend listening at http://master:8081.
2020-02-21 13:38:39,777 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2020-02-21 13:38:40,012 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Shutting down rest endpoint.
2020-02-21 13:38:40,216 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Removing cache directory /tmp/flink-web-9988cd25-1fa3-4e56-a1ce-c73beafa905a/flink-web-ui
2020-02-21 13:38:40,229 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - http://master:8081 lost leadership
2020-02-21 13:38:40,229 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Shut down complete.
2020-02-21 13:38:40,237 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting StatefulFunctionsClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:94)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:96)
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to acquire the Flink configuration from the current environment
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:90)
... 9 more
Caused by: java.lang.RuntimeException: Failed to acquire the Flink configuration from the current environment
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:98)
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:87)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 14 more
Caused by: java.lang.NoSuchMethodException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getConfiguration()
at java.lang.Class.getMethod(Class.java:1786)
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:94)
... 21 more
.
2020-02-21 13:38:40,240 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:37805
2020-02-21 13:38:40,242 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
2020-02-21 13:38:40,247 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
2020-02-21 13:38:40,286 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2020-02-21 13:38:40,293 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
2020-02-21 13:38:40,336 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2020-02-21 13:38:40,345 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
2020-02-21 13:38:40,354 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
2020-02-21 13:38:40,358 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
2020-02-21 13:38:40,412 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
2020-02-21 13:38:40,417 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
2020-02-21 13:38:40,423 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint StatefulFunctionsClusterEntryPoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StatefulFunctionsClusterEntryPoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:94)
Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph.
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar.
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:96)
at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to acquire the Flink configuration from the current environment
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
at org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:90)
... 9 more
Caused by: java.lang.RuntimeException: Failed to acquire the Flink configuration from the current environment
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:98)
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:87)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 14 more
Caused by: java.lang.NoSuchMethodException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getConfiguration()
at java.lang.Class.getMethod(Class.java:1786)
at org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.getConfiguration(StatefulFunctionsConfig.java:94)
... 21 more
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589215387
@igalshilman and I discussed this offline.
Instead of discovering the flink-conf via `GlobalConfigurations` we are accessing it from `StreamExecutionEnvironment#getConfiguration`. This does require reflection as the method is private, but we are guaranteed to successfully access the config regardless of deployment method (image, standalone, session cluster, etc). Because we only use the config object returned in a read-only manner, we deem this safe.
The call to `StreamExecutionEnvironment#configure` allows us to modify the configuration values at runtime. In doing so we can expose all configurations to Harness users via `Harness#withConfiguration`. This is the same pattern used by the TableConfig in the table api to expose configurations without exposing the underlying stream execution environment. Only keys that are set get overridden. If a key is not set then its existing values remain. This method is public.
I've updated the PR based on this discussion and the other misc comment on this thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382139721
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+ // This configuration option exists for the documentation generator
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+ ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Adds the given key/value pair to the Stateful Functions global configuration.")
+ .text(
+ "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+ .linebreak()
+ .text(
+ "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+ .list(
+ code(MODULE_CONFIG_PREFIX + "key1: value1"),
+ code(MODULE_CONFIG_PREFIX + "key2: value2"))
+ .text("are set, then the map")
+ .list(code("key1: value1"), code("key2: value2"))
+ .text("will be made available to your module at runtime.")
+ .build());
+
+ public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+ ConfigOptions.key("statefun.message.serializer")
Review comment:
No, the prefix is only for specified `globalConfigurations` passed into the module. Before the `StatefulFunctionsModule#configure` was getting the entire flink conf. That is dangerous as it exposes way to many internal details and potentially breaking changes since lots of internal configurations are added between the dispatcher and the TM where we were previously pulling from.
What I updated this to do was only pass through flink-confs with the prefix `statefun.module.config`. I'm going to update it to be `statefun.module.global-config` instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381932004
##########
File path: statefun-examples/statefun-flink-harness-example/src/test/java/org/apache/flink/statefun/examples/harness/RunnerTest.java
##########
@@ -33,7 +33,6 @@
public void run() throws Exception {
Harness harness =
new Harness()
- .noCheckpointing()
Review comment:
Is this on purpose?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-588469807
I ran the greeter example to validate the code, I'm not really sure how else to test this but I assume we'll make use of this in e2e tests whenever those are added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382603325
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
+
+ // This configuration option exists for the documentation generator
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+ ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Adds the given key/value pair to the Stateful Functions global configuration.")
+ .text(
+ "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+ .linebreak()
+ .text(
+ "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+ .list(
+ code(MODULE_CONFIG_PREFIX + "key1: value1"),
+ code(MODULE_CONFIG_PREFIX + "key2: value2"))
+ .text("are set, then the map")
+ .list(code("key1: value1"), code("key2: value2"))
+ .text("will be made available to your module at runtime.")
+ .build());
+
+ public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+ ConfigOptions.key("statefun.message.serializer")
+ .enumType(MessageFactoryType.class)
+ .defaultValue(MessageFactoryType.WITH_PROTOBUF_PAYLOADS)
+ .withDescription("The serializer to use for on the wire messages.");
+
+ public static final ConfigOption<String> FLINK_JOB_NAME =
+ ConfigOptions.key("statefun.flink-job-name")
+ .stringType()
+ .defaultValue("StatefulFunctions")
+ .withDescription("The name to display at the Flink-UI");
+
+ /**
+ * Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
+ * current environment set via the {@code flink-conf.yaml}.
+ */
+ public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
+ Configuration configuration = getConfiguration(env);
+ return new StatefulFunctionsConfig(configuration);
+ }
+
+ @SuppressWarnings("JavaReflectionMemberAccess")
+ private static Configuration getConfiguration(StreamExecutionEnvironment env) {
+ try {
+ Method getConfiguration = StreamExecutionEnvironment.class.getMethod("getConfiguration");
Review comment:
I think the problem is here: should be `getDeclaredMethod`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381931926
##########
File path: statefun-examples/statefun-async-example/src/test/java/org/apache/flink/statefun/examples/async/RunnerTest.java
##########
@@ -34,7 +34,6 @@
public void run() throws Exception {
Harness harness =
new Harness()
- .noCheckpointing()
Review comment:
Is this on purpose?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382140387
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.InstantiationUtil;
+
+/** Configuration that captures all stateful function related settings. */
+@SuppressWarnings("WeakerAccess")
+public class StatefulFunctionsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String MODULE_CONFIG_PREFIX = "statefun.module.config.";
+
+ // This configuration option exists for the documentation generator
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> MODULE_GLOBAL_DEFAULT =
+ ConfigOptions.key(MODULE_CONFIG_PREFIX + "<KEY>")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Adds the given key/value pair to the Stateful Functions global configuration.")
+ .text(
+ "These values will be available via the `globalConfigurations` parameter of StatefulFunctionModule#configure.")
+ .linebreak()
+ .text(
+ "Only the key <KEY> and value are added to the configuration. If the key/value pairs")
+ .list(
+ code(MODULE_CONFIG_PREFIX + "key1: value1"),
+ code(MODULE_CONFIG_PREFIX + "key2: value2"))
+ .text("are set, then the map")
+ .list(code("key1: value1"), code("key2: value2"))
+ .text("will be made available to your module at runtime.")
+ .build());
+
+ public static final ConfigOption<MessageFactoryType> USER_MESSAGE_SERIALIZER =
+ ConfigOptions.key("statefun.message.serializer")
Review comment:
Take a look at `StatefulFunctionsConfigTest` and my doc updates
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589663623
I can confirm that there's an issue with the `StreamExecutionEnvironment#getConfiguration` reflection - `NoSuchMethodError` is being thrown for both a manual run of the greeter example, or using the e2e in #28.
Currently looking into it. From the logs it's confirmed that the correct Flink version (1.10.0) is being used.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589676426
Can confirm it is fixed after changing to `getDeclaredMethod` 😄
Both the e2e in #28 and manual greeter example is working.
+1, merging this. Thanks @sjwiesman @igalshilman!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381935261
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
Review comment:
I would maybe call that method `mergeGlobalConfigurationsWith(..`
or any other name that means that it is actually a union between what is already parsed and the argument.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r382100974
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
##########
@@ -17,59 +17,62 @@
*/
package org.apache.flink.statefun.flink.core;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.statefun.flink.core.common.ConfigurationUtil;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+@SuppressWarnings("JavaReflectionMemberAccess")
public class StatefulFunctionsJob {
+ private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
+ private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
- Configuration configuration = parameterTool.getConfiguration();
+ Map<String, String> globalConfigurations = parameterTool.toMap();
+
+ String configDirectory = getConfigurationDirectoryFromEnv();
+ Configuration flinkConf = GlobalConfiguration.loadConfiguration(configDirectory);
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConf);
+ stateFunConfig.setGlobalConfigurations(globalConfigurations);
+ stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
- main(configuration);
+ main(stateFunConfig, new Configuration());
}
- public static void main(Configuration configuration) throws Exception {
- Objects.requireNonNull(configuration);
+ public static void main(StatefulFunctionsConfig stateFunConfig, Configuration flinkConf)
+ throws Exception {
+ Objects.requireNonNull(stateFunConfig);
+ Objects.requireNonNull(flinkConf);
setDefaultContextClassLoaderIfAbsent();
- setDefaultProviderIfAbsent(
- configuration, new StatefulFunctionsUniverses.ClassPathUniverseProvider());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.configure(flinkConf, Thread.currentThread().getContextClassLoader());
+
+ env.getConfig().enableObjectReuse();
final StatefulFunctionsUniverse statefulFunctionsUniverse =
StatefulFunctionsUniverses.get(
- Thread.currentThread().getContextClassLoader(), configuration);
+ Thread.currentThread().getContextClassLoader(), stateFunConfig);
final StatefulFunctionsUniverseValidator statefulFunctionsUniverseValidator =
new StatefulFunctionsUniverseValidator();
statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- setDefaultConfiguration(configuration, env);
-
- FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse);
+ FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
flinkUniverse.configure(env);
- String jobName = configuration.getValue(StatefulFunctionsJobConstants.FLINK_JOB_NAME);
- env.execute(jobName);
- }
-
- private static void setDefaultConfiguration(
- Configuration configuration, StreamExecutionEnvironment env) {
- env.getConfig().setGlobalJobParameters(configuration);
- env.getConfig().enableObjectReuse();
- final long checkpointingInterval =
- configuration.getLong(StatefulFunctionsJobConstants.CHECKPOINTING_INTERVAL);
- if (checkpointingInterval > 0) {
- env.enableCheckpointing(checkpointingInterval);
Review comment:
That is great, thanks for clarifying.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] sjwiesman edited a comment on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
sjwiesman edited a comment on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589215387
@igalshilman and I discussed this offline.
Instead of discovering the flink-conf via `GlobalConfigurations` we are accessing it from `StreamExecutionEnvironment#getConfiguration`. This does require reflection as the method is protected, but we are guaranteed to successfully access the config regardless of deployment method (image, standalone, session cluster, etc). Because we only use the config object returned in a read-only manner, we deem this safe.
The call to `StreamExecutionEnvironment#configure` allows us to modify the configuration values at runtime. In doing so we can expose all configurations to Harness users via `Harness#withConfiguration`. This is the same pattern used by the TableConfig in the table api to expose configurations without exposing the underlying stream execution environment. Only keys that are set get overridden. If a key is not set then its existing values remain. This method is public.
I've updated the PR based on this discussion and the other misc comment on this thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai closed pull request #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] igalshilman commented on a change in pull request
#27: [FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#discussion_r381956919
##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
##########
@@ -51,15 +49,19 @@
// -- configuration
private final Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs;
+ private final StatefulFunctionsConfig configuration;
+
// -- runtime
private transient Reductions reductions;
private transient MailboxExecutor mailboxExecutor;
FunctionGroupOperator(
Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs,
+ StatefulFunctionsConfig configuration,
MailboxExecutor mailboxExecutor,
ChainingStrategy chainingStrategy) {
this.sideOutputs = Objects.requireNonNull(sideOutputs);
+ this.configuration = configuration;
Review comment:
can you add `Object.requireNonNull`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [flink-statefun] tzulitai commented on issue #27:
[FLINK-16149][core] Set configurations using
StreamExecutionEnvironment#getConfiguration
Posted by GitBox <gi...@apache.org>.
tzulitai commented on issue #27: [FLINK-16149][core] Set configurations using StreamExecutionEnvironment#getConfiguration
URL: https://github.com/apache/flink-statefun/pull/27#issuecomment-589583732
Thanks for the work and reviews @sjwiesman @igalshilman.
Just a quick update from my side:
I've been trying to verify the changes here, and am seeing test failures with the end-to-end test introduced in #28. Currently trying to check if its just something funky with that new end-to-end test and not relevant for the changes in this PR.
Otherwise, I had a look over the changes and they look good to me!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services