You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:08 UTC
[flink] 06/11: [FLINK-17407] Introduce config options for external
resource framework
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e725526d2542454df88a88ac849501a09127324f
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Apr 1 11:33:57 2020 +0800
[FLINK-17407] Introduce config options for external resource framework
---
.../generated/external_resource_configuration.html | 36 +++++
.../generated/kubernetes_config_configuration.html | 6 +
.../generated/yarn_config_configuration.html | 6 +
.../configuration/ExternalResourceOptions.java | 157 +++++++++++++++++++++
.../configuration/KubernetesConfigOptions.java | 18 +++
.../yarn/configuration/YarnConfigOptions.java | 18 +++
6 files changed, 241 insertions(+)
diff --git a/docs/_includes/generated/external_resource_configuration.html b/docs/_includes/generated/external_resource_configuration.html
new file mode 100644
index 0000000..f42c4c9
--- /dev/null
+++ b/docs/_includes/generated/external_resource_configuration.html
@@ -0,0 +1,36 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>external-resource.<resource_name>.amount</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The amount for the external resource specified by <resource_name> per TaskExecutor.</td>
+ </tr>
+ <tr>
+ <td><h5>external-resource.<resource_name>.driver-factory.class</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Defines the factory class name for the external resource identified by <resource_name>. The factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side. For example, org.apache.flink.externalresource.gpu.GPUDriverFactory</td>
+ </tr>
+ <tr>
+ <td><h5>external-resource.<resource_name>.param.<param></h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The naming pattern of custom config options for the external resource specified by <resource_name>. Only the configurations that follow this pattern would be passed into the driver factory of that external resource.</td>
+ </tr>
+ <tr>
+ <td><h5>external-resources</h5></td>
+ <td style="word-wrap: break-word;"></td>
+ <td>List<String></td>
+ <td>List of the <resource_name> of all external resources with delimiter ";", e.g. "gpu;fpga" for two external resource gpu and fpga. The <resource_name> will be used to splice related config options for external resource. Only the <resource_name> defined here will go into effect by external resource framework.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html
index 4183967..6b19cec 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -9,6 +9,12 @@
</thead>
<tbody>
<tr>
+ <td><h5>external-resource.<resource_name>.kubernetes.config-key</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>If configured, Flink will add "resources.limits.<config-key>" and "resources.requests.<config-key>" to the main container of TaskExecutor and set the value to the value of external-resource.<resource_name>.amount.</td>
+ </tr>
+ <tr>
<td><h5>kubernetes.cluster-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 8c5f909..3f4a2d1 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -9,6 +9,12 @@
</thead>
<tbody>
<tr>
+ <td><h5>external-resource.<resource_name>.yarn.config-key</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be set to the value of external-resource.<resource_name>.amount.</td>
+ </tr>
+ <tr>
<td><h5>yarn.application-attempt-failures-validity-interval</h5></td>
<td style="word-wrap: break-word;">10000</td>
<td>Long</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
new file mode 100644
index 0000000..47dc2c8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+ /** The amount of the external resource per task executor. This is used as a suffix in an actual config. */
+ public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+ /** The driver factory class of the external resource to use. This is used as a suffix in an actual config. */
+ public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = "driver-factory.class";
+
+ /** The suffix of custom config options' prefix for the external resource. */
+ public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = "param.";
+
+ /** The naming pattern of custom config options for the external resource. This is used as a suffix. */
+ private static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
+
+ /**
+ * The prefix for all external resources configs. Has to be combined with a resource name and
+ * the configs mentioned below.
+ */
+ private static final String EXTERNAL_RESOURCE_PREFIX = "external-resource";
+
+ /**
+ * List of the resource_name of all external resources with delimiter ";", e.g. "gpu;fpga" for two external resource gpu and fpga.
+ * The resource_name will be used to splice related config options for external resource. Only the resource_name defined here will
+ * go into effect in external resource framework.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * external-resources = gpu;fpga
+ *
+ * external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
+ * external-resource.gpu.amount: 2
+ * external-resource.gpu.param.type: nvidia
+ *
+ * external-resource.fpga.driver-factory.class: org.apache.flink.externalresource.fpga.FPGADriverFactory
+ * external-resource.fpga.amount: 1
+ * }</pre>
+ */
+ public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+ key("external-resources")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription("List of the <resource_name> of all external resources with delimiter \";\", e.g. \"gpu;fpga\" " +
+ "for two external resource gpu and fpga. The <resource_name> will be used to splice related config options for " +
+ "external resource. Only the <resource_name> defined here will go into effect by external resource framework.");
+
+ /**
+ * Defines the factory class name for the external resource identified by >resource_name<. The factory will be used
+ * to instantiate the {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the TaskExecutor side.
+ *
+ * <p>It is intentionally included into user docs while unused.
+ */
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+ key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Defines the factory class name for the external resource identified by <resource_name>. The " +
+ "factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side. For example, " +
+ "org.apache.flink.externalresource.gpu.GPUDriverFactory");
+
+ /**
+ * The amount for the external resource specified by >resource_name< per TaskExecutor.
+ *
+ * <p>It is intentionally included into user docs while unused.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+ key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+ .longType()
+ .noDefaultValue()
+ .withDescription("The amount for the external resource specified by <resource_name> per TaskExecutor.");
+
+ /**
+ * The naming pattern of custom config options for the external resource specified by >resource_name<.
+ * Only the configurations that follow this pattern would be passed into the driver factory of that external resource.
+ *
+ * <p>It is intentionally included into user docs while unused.
+ */
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_PARAM =
+ key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The naming pattern of custom config options for the external resource specified by <resource_name>. " +
+ "Only the configurations that follow this pattern would be passed into the driver factory of that external resource.");
+
+ public static String genericKeyWithSuffix(String suffix) {
+ return keyWithResourceNameAndSuffix("<resource_name>", suffix);
+ }
+
+ /**
+ * Generate the config option key with resource_name and suffix.
+ */
+ private static String keyWithResourceNameAndSuffix(String resourceName, String suffix) {
+ return String.format("%s.%s.%s", EXTERNAL_RESOURCE_PREFIX, Preconditions.checkNotNull(resourceName), Preconditions.checkNotNull(suffix));
+ }
+
+ /**
+ * Generate the config option key for the amount of external resource with resource_name.
+ */
+ public static String getAmountConfigOptionForResource(String resourceName) {
+ return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_AMOUNT_SUFFIX);
+ }
+
+ /**
+ * Generate the config option key for the configuration key of external resource in the deploying system.
+ */
+ public static String getSystemConfigKeyConfigOptionForResource(String resourceName, String suffix) {
+ return keyWithResourceNameAndSuffix(resourceName, suffix);
+ }
+
+ /**
+ * Generate the config option key for the factory class name of {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver}.
+ */
+ public static String getExternalResourceDriverFactoryConfigOptionForResource(String resourceName) {
+ return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+ }
+
+ /**
+ * Generate the suffix option key prefix for the user-defined params for external resources.
+ */
+ public static String getExternalResourceParamConfigPrefixForResource(String resourceName) {
+ return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX);
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 6b1656c..0c4303f 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ExternalResourceOptions;
import java.util.List;
import java.util.Map;
@@ -211,6 +212,23 @@ public class KubernetesConfigOptions {
.withDescription("The user-specified annotations that are set to the rest Service. The value should be " +
"in the form of a1:v1,a2:v2");
+ /** Defines the configuration key of that external resource in Kubernetes. This is used as a suffix in an actual config. */
+ public static final String EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+ /**
+ * If configured, Flink will add "resources.limits.>config-key<" and "resources.requests.>config-key<" to the main
+ * container of TaskExecutor and set the value to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+ *
+ * <p>It is intentionally included into user docs while unused.
+ */
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+ key(ExternalResourceOptions.genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("If configured, Flink will add \"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+ "to the main container of TaskExecutor and set the value to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
/**
* The flink rest service exposed type.
*/
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 02e71d0..14ad257 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn.configuration;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.description.Description;
@@ -271,6 +272,23 @@ public class YarnConfigOptions {
"they doesn't need to be downloaded every time for each application. An example could be " +
"hdfs://$namenode_address/path/of/flink/lib");
+ /** Defines the configuration key of that external resource in Yarn. This is used as a suffix in an actual config. */
+ public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = "yarn.config-key";
+
+ /**
+ * If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be
+ * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+ *
+ * <p>It is intentionally included into user docs while unused.
+ */
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+ key(ExternalResourceOptions.genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("If configured, Flink will add this key to the resource profile of container request to Yarn. " +
+ "The value will be set to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
// ------------------------------------------------------------------------
/** This class is not meant to be instantiated. */