You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:08 UTC

[flink] 06/11: [FLINK-17407] Introduce config options for external resource framework

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e725526d2542454df88a88ac849501a09127324f
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Apr 1 11:33:57 2020 +0800

    [FLINK-17407] Introduce config options for external resource framework
---
 .../generated/external_resource_configuration.html |  36 +++++
 .../generated/kubernetes_config_configuration.html |   6 +
 .../generated/yarn_config_configuration.html       |   6 +
 .../configuration/ExternalResourceOptions.java     | 157 +++++++++++++++++++++
 .../configuration/KubernetesConfigOptions.java     |  18 +++
 .../yarn/configuration/YarnConfigOptions.java      |  18 +++
 6 files changed, 241 insertions(+)

diff --git a/docs/_includes/generated/external_resource_configuration.html b/docs/_includes/generated/external_resource_configuration.html
new file mode 100644
index 0000000..f42c4c9
--- /dev/null
+++ b/docs/_includes/generated/external_resource_configuration.html
@@ -0,0 +1,36 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>external-resource.&lt;resource_name&gt;.amount</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The amount for the external resource specified by &lt;resource_name&gt; per TaskExecutor.</td>
+        </tr>
+        <tr>
+            <td><h5>external-resource.&lt;resource_name&gt;.driver-factory.class</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Defines the factory class name for the external resource identified by &lt;resource_name&gt;. The factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side. For example, org.apache.flink.externalresource.gpu.GPUDriverFactory</td>
+        </tr>
+        <tr>
+            <td><h5>external-resource.&lt;resource_name&gt;.param.&lt;param&gt;</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The naming pattern of custom config options for the external resource specified by &lt;resource_name&gt;. Only the configurations that follow this pattern would be passed into the driver factory of that external resource.</td>
+        </tr>
+        <tr>
+            <td><h5>external-resources</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>List of the &lt;resource_name&gt; of all external resources with delimiter ";", e.g. "gpu;fpga" for two external resource gpu and fpga. The &lt;resource_name&gt; will be used to splice related config options for external resource. Only the &lt;resource_name&gt; defined here will go into effect by external resource framework.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html
index 4183967..6b19cec 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>external-resource.&lt;resource_name&gt;.kubernetes.config-key</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>If configured, Flink will add "resources.limits.&lt;config-key&gt;" and "resources.requests.&lt;config-key&gt;" to the main container of TaskExecutor and set the value to the value of external-resource.&lt;resource_name&gt;.amount.</td>
+        </tr>
+        <tr>
             <td><h5>kubernetes.cluster-id</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 8c5f909..3f4a2d1 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>external-resource.&lt;resource_name&gt;.yarn.config-key</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be set to the value of external-resource.&lt;resource_name&gt;.amount.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.application-attempt-failures-validity-interval</h5></td>
             <td style="word-wrap: break-word;">10000</td>
             <td>Long</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
new file mode 100644
index 0000000..47dc2c8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+	/** The amount of the external resource per task executor. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+	/** The driver factory class of the external resource to use. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = "driver-factory.class";
+
+	/** The suffix of custom config options' prefix for the external resource. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = "param.";
+
+	/** The naming pattern of custom config options for the external resource. This is used as a suffix. */
+	private static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
+
+	/**
+	 * The prefix for all external resources configs. Has to be combined with a resource name and
+	 * the configs mentioned below.
+	 */
+	private static final String EXTERNAL_RESOURCE_PREFIX = "external-resource";
+
+	/**
+	 * List of the resource_name of all external resources with delimiter ";", e.g. "gpu;fpga" for two external resource gpu and fpga.
+	 * The resource_name will be used to splice related config options for external resource. Only the resource_name defined here will
+	 * go into effect in external resource framework.
+	 *
+	 * <p>Example:
+	 * <pre>{@code
+	 * external-resources = gpu;fpga
+	 *
+	 * external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
+	 * external-resource.gpu.amount: 2
+	 * external-resource.gpu.param.type: nvidia
+	 *
+	 * external-resource.fpga.driver-factory.class: org.apache.flink.externalresource.fpga.FPGADriverFactory
+	 * external-resource.fpga.amount: 1
+	 * }</pre>
+	 */
+	public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+		key("external-resources")
+			.stringType()
+			.asList()
+			.defaultValues()
+			.withDescription("List of the <resource_name> of all external resources with delimiter \";\", e.g. \"gpu;fpga\" " +
+				"for two external resource gpu and fpga. The <resource_name> will be used to splice related config options for " +
+				"external resource. Only the <resource_name> defined here will go into effect by external resource framework.");
+
+	/**
+	 * Defines the factory class name for the external resource identified by &gt;resource_name&lt;. The factory will be used
+	 * to instantiate the {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the TaskExecutor side.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+		key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Defines the factory class name for the external resource identified by <resource_name>. The " +
+				"factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side. For example, " +
+				"org.apache.flink.externalresource.gpu.GPUDriverFactory");
+
+	/**
+	 * The amount for the external resource specified by &gt;resource_name&lt; per TaskExecutor.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+		key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+			.longType()
+			.noDefaultValue()
+			.withDescription("The amount for the external resource specified by <resource_name> per TaskExecutor.");
+
+	/**
+	 * The naming pattern of custom config options for the external resource specified by &gt;resource_name&lt;.
+	 * Only the configurations that follow this pattern would be passed into the driver factory of that external resource.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_PARAM =
+		key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX))
+			.stringType()
+			.noDefaultValue()
+			.withDescription("The naming pattern of custom config options for the external resource specified by <resource_name>. " +
+				"Only the configurations that follow this pattern would be passed into the driver factory of that external resource.");
+
+	public static String genericKeyWithSuffix(String suffix) {
+		return keyWithResourceNameAndSuffix("<resource_name>", suffix);
+	}
+
+	/**
+	 * Generate the config option key with resource_name and suffix.
+	 */
+	private static String keyWithResourceNameAndSuffix(String resourceName, String suffix) {
+		return String.format("%s.%s.%s", EXTERNAL_RESOURCE_PREFIX, Preconditions.checkNotNull(resourceName), Preconditions.checkNotNull(suffix));
+	}
+
+	/**
+	 * Generate the config option key for the amount of external resource with resource_name.
+	 */
+	public static String getAmountConfigOptionForResource(String resourceName) {
+		return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_AMOUNT_SUFFIX);
+	}
+
+	/**
+	 * Generate the config option key for the configuration key of external resource in the deploying system.
+	 */
+	public static String getSystemConfigKeyConfigOptionForResource(String resourceName, String suffix) {
+		return keyWithResourceNameAndSuffix(resourceName, suffix);
+	}
+
+	/**
+	 * Generate the config option key for the factory class name of {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver}.
+	 */
+	public static String getExternalResourceDriverFactoryConfigOptionForResource(String resourceName) {
+		return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX);
+	}
+
+	/**
+	 * Generate the suffix option key prefix for the user-defined params for external resources.
+	 */
+	public static String getExternalResourceParamConfigPrefixForResource(String resourceName) {
+		return keyWithResourceNameAndSuffix(resourceName, EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX);
+	}
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 6b1656c..0c4303f 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ExternalResourceOptions;
 
 import java.util.List;
 import java.util.Map;
@@ -211,6 +212,23 @@ public class KubernetesConfigOptions {
 			.withDescription("The user-specified annotations that are set to the rest Service. The value should be " +
 				"in the form of a1:v1,a2:v2");
 
+	/** Defines the configuration key of that external resource in Kubernetes. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+	/**
+	 * If configured, Flink will add "resources.limits.&gt;config-key&lt;" and "resources.requests.&gt;config-key&lt;" to the main
+	 * container of TaskExecutor and set the value to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+		key(ExternalResourceOptions.genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+			.stringType()
+			.noDefaultValue()
+			.withDescription("If configured, Flink will add \"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+				"to the main container of TaskExecutor and set the value to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
 	/**
 	 * The flink rest service exposed type.
 	 */
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 02e71d0..14ad257 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn.configuration;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ExternalResourceOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.description.Description;
 
@@ -271,6 +272,23 @@ public class YarnConfigOptions {
 				"they doesn't need to be downloaded every time for each application. An example could be " +
 				"hdfs://$namenode_address/path/of/flink/lib");
 
+	/** Defines the configuration key of that external resource in Yarn. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = "yarn.config-key";
+
+	/**
+	 * If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be
+	 * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+		key(ExternalResourceOptions.genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+			.stringType()
+			.noDefaultValue()
+			.withDescription("If configured, Flink will add this key to the resource profile of container request to Yarn. " +
+				"The value will be set to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */