You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/06 06:26:18 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

xintongsong commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r420529485



##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
##########
@@ -71,13 +71,17 @@
 
 	protected FlinkKubeClient flinkKubeClient;
 
-	@Before
-	public void setup() throws Exception {
+	protected void setupFlinkConfig() {

Review comment:
       I assume the purpose of this hotfix commit is to make sure `flinkConfig` is alway setup before used?
   This is not very obvious. It would be helpful if we explain this a bit in the commit message.

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+
+/**
+ * Contains the information of an external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceInfo {
+
+	/**
+	 * Get the property indicated by the specified key.
+	 */
+	String getProperty(String key);

Review comment:
       What if the key doesn't exist? Do we return `null` or throw NPE? These behaviors should be explicitly described in javadocs.

##########
File path: flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceDriver.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Set;
+
+/**
+ * Driver which takes the responsibility to manage and provide the information of external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceDriver {
+
+	/**
+	 * Retrieve the information of the external resources according to the amount.
+	 */
+	Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount);

Review comment:
       Does this API possibly throw exceptions?

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+	/** The amount of the external resource per task executor. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+	/** The driver factory class of the external resource to use. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = "driver-factory.class";
+
+	/** Defines the configuration key of that external resource in Yarn. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = "yarn.config-key";
+
+	/** Defines the configuration key of that external resource in Kubernetes. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+	/** The naming pattern of custom config options for the external resource. This is used as a suffix. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = "param.<param>";

Review comment:
       ```suggestion
   	private static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
   ```
   We would need to move this to after `EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+	private ExternalResourceUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Get the enabled external resource list from configuration.
+	 */
+	private static Set<String> getExternalResourceSet(Configuration config) {
+		return new HashSet<>(ConfigUtils.decodeListFromConfig(config, ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+	}
+
+	/**
+	 * Get the external resources map for Kubernetes. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForKubernetes(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map for Yarn. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForYarn(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map.
+	 */
+	@VisibleForTesting
+	static Map<String, Long> getExternalResources(Configuration config, String suffix) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceConfigs = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final ConfigOption<String> configKeyOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+					.stringType()
+					.noDefaultValue();
+			final String configKey = config.getString(configKeyOption);
+			final Optional<Long> amount = config.getOptional(amountOption);
+
+			if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+				LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName);
+				continue;
+			}
+			if (!amount.isPresent() || amount.get() <= 0) {
+				LOG.warn("The amount of the {} should be configured and the value should be positive. Will ignore that resource.", resourceName);

Review comment:
       Better to also log the current status. Is the amount configured? What value is the amount currently configured to.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+	/** The amount of the external resource per task executor. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+	/** The driver factory class of the external resource to use. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = "driver-factory.class";
+
+	/** Defines the configuration key of that external resource in Yarn. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = "yarn.config-key";
+
+	/** Defines the configuration key of that external resource in Kubernetes. This is used as a suffix in an actual config. */
+	public static final String EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+	/** The naming pattern of custom config options for the external resource. This is used as a suffix. */
+	public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = "param.<param>";
+
+	/**
+	 * The prefix for all external resources configs. Has to be combined with a resource name and
+	 * the configs mentioned below.
+	 */
+	private static final String EXTERNAL_RESOURCE_PREFIX = "external-resource";
+
+	/**
+	 * List of the resource_name of all external resources with delimiter ";". The resource_name will be used to
+	 * splice related config options for external resource. Only the resource_name defined here will go into effect in external
+	 * resource framework.
+	 */
+	public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+		key("external-resources")
+			.stringType()
+			.asList()
+			.noDefaultValue()
+			.withDescription("List of the <resource_name> of all external resources with delimiter \";\". " +
+				"The <resource_name> will be used to splice related config options for external resource. Only the " +
+				"<resource_name> defined here will go into effect by external resource framework.");
+
+	/**
+	 * Defines the factory class name for the external resource identified by &gt;resource_name&lt;. The factory will be used
+	 * to instantiated the {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the TaskExecutor side.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+		key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Defines the factory class name for the external resource identified by <resource_name>. The " +
+				"factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side.");
+
+	/**
+	 * The amount for the external resource specified by &gt;resource_name&lt; per TaskExecutor.
+	 *
+	 * <p>It is intentionally included into user docs while unused.
+	 */
+	@SuppressWarnings("unused")

Review comment:
       ```suggestion
   	@SuppressWarnings("WeakerAccess")
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+	private ExternalResourceUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Get the enabled external resource list from configuration.
+	 */
+	private static Set<String> getExternalResourceSet(Configuration config) {
+		return new HashSet<>(ConfigUtils.decodeListFromConfig(config, ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+	}
+
+	/**
+	 * Get the external resources map for Kubernetes. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForKubernetes(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map for Yarn. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForYarn(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map.
+	 */
+	@VisibleForTesting
+	static Map<String, Long> getExternalResources(Configuration config, String suffix) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceConfigs = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final ConfigOption<String> configKeyOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+					.stringType()
+					.noDefaultValue();
+			final String configKey = config.getString(configKeyOption);
+			final Optional<Long> amount = config.getOptional(amountOption);
+
+			if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+				LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName);
+				continue;
+			}
+			if (!amount.isPresent() || amount.get() <= 0) {
+				LOG.warn("The amount of the {} should be configured and the value should be positive. Will ignore that resource.", resourceName);
+				continue;
+			}
+
+			if (externalResourceConfigs.put(configKey, amount.get()) != null) {
+				LOG.warn("Duplicate config key {} occurred for external resources, the one with amount {} will overwrite the value.", configKey, amount);
+			} else {
+				LOG.info("Add external resources configs with key {} value {}.", configKey, amount);

Review comment:
       Better to mention the resource name in logs.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+	private ExternalResourceUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Get the enabled external resource list from configuration.
+	 */
+	private static Set<String> getExternalResourceSet(Configuration config) {
+		return new HashSet<>(ConfigUtils.decodeListFromConfig(config, ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+	}
+
+	/**
+	 * Get the external resources map for Kubernetes. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForKubernetes(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map for Yarn. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForYarn(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map.
+	 */
+	@VisibleForTesting
+	static Map<String, Long> getExternalResources(Configuration config, String suffix) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceConfigs = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final ConfigOption<String> configKeyOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+					.stringType()
+					.noDefaultValue();
+			final String configKey = config.getString(configKeyOption);
+			final Optional<Long> amount = config.getOptional(amountOption);

Review comment:
       nit: might be better to name the variable `amountOpt` to hint the type.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+	static final ResourceInformationReflector INSTANCE = new ResourceInformationReflector();
+
+	/** Class used to set the extended resource. */
+	private static final String RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+	private final Method resourceSetResourceInformationMethod;
+	private final Method resourceGetResourcesMethod;
+	private final Method resourceInformationGetNameMethod;
+	private final Method resourceInformationGetValueMethod;
+	private final Method resourceInformationNewInstanceMethod;

Review comment:
       We should annotate these as `@Nullable`, and comment that these can be `null` only if `isYarnResourceTypesAvailable` if `false`.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+	static final ResourceInformationReflector INSTANCE = new ResourceInformationReflector();
+
+	/** Class used to set the extended resource. */
+	private static final String RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+	private final Method resourceSetResourceInformationMethod;
+	private final Method resourceGetResourcesMethod;
+	private final Method resourceInformationGetNameMethod;
+	private final Method resourceInformationGetValueMethod;
+	private final Method resourceInformationNewInstanceMethod;
+	private final boolean isYarnResourceTypesAvailable;
+
+	private ResourceInformationReflector() {
+		this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+	}
+
+	@VisibleForTesting
+	ResourceInformationReflector(String resourceClassName, String resourceInfoClassName) {
+		Method resourceSetResourceInformationMethod = null;
+		Method resourceGetResourcesMethod = null;
+		Method resourceInformationGetNameMethod = null;
+		Method resourceInformationGetValueMethod = null;
+		Method resourceInformationNewInstanceMethod = null;
+		boolean isYarnResourceTypesAvailable = false;
+		try {
+			final Class<?> resourceClass = Class.forName(resourceClassName);
+			final Class<?> resourceInfoClass = Class.forName(resourceInfoClassName);
+			resourceSetResourceInformationMethod = resourceClass.getMethod("setResourceInformation", String.class, resourceInfoClass);
+			resourceGetResourcesMethod = resourceClass.getMethod("getResources");
+			resourceInformationGetNameMethod = resourceInfoClass.getMethod("getName");
+			resourceInformationGetValueMethod = resourceInfoClass.getMethod("getValue");
+			resourceInformationNewInstanceMethod = resourceInfoClass.getMethod("newInstance", String.class, long.class);
+			isYarnResourceTypesAvailable = true;
+		} catch (Exception e) {
+			LOG.debug("The underlying Yarn does not support external resource.", e);
+		} finally {
+			this.resourceSetResourceInformationMethod = resourceSetResourceInformationMethod;
+			this.resourceGetResourcesMethod = resourceGetResourcesMethod;
+			this.resourceInformationGetNameMethod = resourceInformationGetNameMethod;
+			this.resourceInformationGetValueMethod = resourceInformationGetValueMethod;
+			this.resourceInformationNewInstanceMethod = resourceInformationNewInstanceMethod;
+			this.isYarnResourceTypesAvailable = isYarnResourceTypesAvailable;
+		}
+	}
+
+	/**
+	 * Add the given resourceName and value to the {@link Resource}.
+	 */
+	void setResourceInformation(Resource resource, String resourceName, long amount) {
+		setResourceInformationUnSafe(resource, resourceName, amount);
+	}
+
+	/**
+	 * Same as {@link #setResourceInformation(Resource, String, long)} but
+	 * allows to pass objects that are not of type {@link Resource}.
+	 */
+	@VisibleForTesting
+	void setResourceInformationUnSafe(Object resource, String resourceName, long amount) {

Review comment:
       I think we can remove the annotation and make the method private. It is not used by any test cases.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+	static final ResourceInformationReflector INSTANCE = new ResourceInformationReflector();
+
+	/** Class used to set the extended resource. */
+	private static final String RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+	private final Method resourceSetResourceInformationMethod;
+	private final Method resourceGetResourcesMethod;
+	private final Method resourceInformationGetNameMethod;
+	private final Method resourceInformationGetValueMethod;
+	private final Method resourceInformationNewInstanceMethod;
+	private final boolean isYarnResourceTypesAvailable;
+
+	private ResourceInformationReflector() {
+		this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+	}
+
+	@VisibleForTesting
+	ResourceInformationReflector(String resourceClassName, String resourceInfoClassName) {
+		Method resourceSetResourceInformationMethod = null;
+		Method resourceGetResourcesMethod = null;
+		Method resourceInformationGetNameMethod = null;
+		Method resourceInformationGetValueMethod = null;
+		Method resourceInformationNewInstanceMethod = null;
+		boolean isYarnResourceTypesAvailable = false;
+		try {
+			final Class<?> resourceClass = Class.forName(resourceClassName);
+			final Class<?> resourceInfoClass = Class.forName(resourceInfoClassName);
+			resourceSetResourceInformationMethod = resourceClass.getMethod("setResourceInformation", String.class, resourceInfoClass);
+			resourceGetResourcesMethod = resourceClass.getMethod("getResources");
+			resourceInformationGetNameMethod = resourceInfoClass.getMethod("getName");
+			resourceInformationGetValueMethod = resourceInfoClass.getMethod("getValue");
+			resourceInformationNewInstanceMethod = resourceInfoClass.getMethod("newInstance", String.class, long.class);
+			isYarnResourceTypesAvailable = true;
+		} catch (Exception e) {
+			LOG.debug("The underlying Yarn does not support external resource.", e);
+		} finally {
+			this.resourceSetResourceInformationMethod = resourceSetResourceInformationMethod;
+			this.resourceGetResourcesMethod = resourceGetResourcesMethod;
+			this.resourceInformationGetNameMethod = resourceInformationGetNameMethod;
+			this.resourceInformationGetValueMethod = resourceInformationGetValueMethod;
+			this.resourceInformationNewInstanceMethod = resourceInformationNewInstanceMethod;
+			this.isYarnResourceTypesAvailable = isYarnResourceTypesAvailable;
+		}
+	}
+
+	/**
+	 * Add the given resourceName and value to the {@link Resource}.
+	 */
+	void setResourceInformation(Resource resource, String resourceName, long amount) {
+		setResourceInformationUnSafe(resource, resourceName, amount);
+	}
+
+	/**
+	 * Same as {@link #setResourceInformation(Resource, String, long)} but
+	 * allows to pass objects that are not of type {@link Resource}.
+	 */
+	@VisibleForTesting
+	void setResourceInformationUnSafe(Object resource, String resourceName, long amount) {
+		if (!isYarnResourceTypesAvailable) {
+			LOG.info("Will not request extended resource {} because the underlying YARN does not support it.", resourceName);
+		}

Review comment:
       Shouldn't the method return inside the if block?

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -145,6 +151,14 @@ private boolean resourceWithinMaxAllocation(final InternalContainerResource reso
 		return resource.memory <= maxMemMB && resource.vcores <= maxVcore;
 	}
 
+	@VisibleForTesting
+	static void setExternalResourceRequestIfPossible(Map<String, Long> externalResources, Resource resource) {
+		for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) {
+			ResourceInformationReflector.INSTANCE.setResourceInformation(resource, externalResource.getKey(), externalResource.getValue());
+			LOG.info("Successfully request the external resource {} with amount {}.", externalResource.getKey(), externalResource.getValue());

Review comment:
       I would not add this log here.
   - The external resources for all containers are identical. No need to repeat this log for every container.
   - At this place, we can only say the external resources are set to the `Resource`. We have not successfully requested a container with this `Resource`.
   
   Alternatively, I would suggest to:
   - Log the external resources for only once, in `YarnResourceManager` where they are derived.
   - Log whether the external resources will be applied or ignored in `ResourceManagerReflector`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void testGetExternalResourcesWithMultipleExternalResource() {
 		assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1));
 		assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2));
 	}
+
+	@Test
+	public void testConstructExternalResourceDriversFromConfig() throws Exception {

Review comment:
       This test case does not throw exceptions. Same for the other cases below.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -114,4 +119,53 @@ private ExternalResourceUtils() {
 
 		return externalResourceConfigs;
 	}
+
+	/**
+	 * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s
+	 * are mapped by its resource name.
+	 */
+	public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class);
+		final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>();
+		factoryIterator.forEachRemaining(
+			externalResourceDriverFactory -> {
+				externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory);
+			});

Review comment:
       ```suggestion
   			externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory));
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -120,6 +121,35 @@ private ExternalResourceUtils() {
 		return externalResourceConfigs;
 	}
 
+	/**
+	 * Get the map of resource name and amount of all of enabled external resources.
+	 */
+	private static Map<String, Long> getExternalResourceAmountMap(Configuration config) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final Optional<Long> amount = config.getOptional(amountOption);
+			if (!amount.isPresent() || amount.get() <= 0) {
+				LOG.warn("The amount of the {} should be configured and the value should be positive. Will ignore that resource.", resourceName);

Review comment:
       Same here. Let's also log the current status.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+	private ExternalResourceUtils() {
+		throw new UnsupportedOperationException("This class should never be instantiated.");
+	}
+
+	/**
+	 * Get the enabled external resource list from configuration.
+	 */
+	private static Set<String> getExternalResourceSet(Configuration config) {
+		return new HashSet<>(ConfigUtils.decodeListFromConfig(config, ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+	}
+
+	/**
+	 * Get the external resources map for Kubernetes. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForKubernetes(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map for Yarn. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 */
+	public static Map<String, Long> getExternalResourcesForYarn(Configuration config) {
+		return getExternalResources(config, ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+	}
+
+	/**
+	 * Get the external resources map.
+	 */
+	@VisibleForTesting
+	static Map<String, Long> getExternalResources(Configuration config, String suffix) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceConfigs = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final ConfigOption<String> configKeyOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+					.stringType()
+					.noDefaultValue();
+			final String configKey = config.getString(configKeyOption);
+			final Optional<Long> amount = config.getOptional(amountOption);
+
+			if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+				LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName);
+				continue;
+			}
+			if (!amount.isPresent() || amount.get() <= 0) {
+				LOG.warn("The amount of the {} should be configured and the value should be positive. Will ignore that resource.", resourceName);
+				continue;
+			}
+
+			if (externalResourceConfigs.put(configKey, amount.get()) != null) {
+				LOG.warn("Duplicate config key {} occurred for external resources, the one with amount {} will overwrite the value.", configKey, amount);
+			} else {
+				LOG.info("Add external resources configs with key {} value {}.", configKey, amount);
+			}
+		}
+
+		return externalResourceConfigs;
+	}
+
+	/**
+	 * Get the map of resource name and amount of all of enabled external resources.
+	 */
+	private static Map<String, Long> getExternalResourceAmountMap(Configuration config) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+					.longType()
+					.noDefaultValue();
+			final Optional<Long> amount = config.getOptional(amountOption);
+			if (!amount.isPresent() || amount.get() <= 0) {
+				LOG.warn("The amount of the {} should be configured and the value should be positive. Will ignore that resource.", resourceName);
+				continue;
+			} else {
+				externalResourceAmountMap.put(resourceName, amount.get());
+			}
+		}
+
+		return externalResourceAmountMap;
+	}
+
+	/**
+	 * Instantiate the {@link ExternalResourceDriver}s for all of enabled external resources. {@link ExternalResourceDriver}s
+	 * are mapped by its resource name.
+	 */
+	public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class);
+		final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>();
+		factoryIterator.forEachRemaining(
+			externalResourceDriverFactory -> {
+				externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory);
+			});
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<String> driverClassOption =
+				key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+					.stringType()
+					.noDefaultValue();
+			final String driverFactoryClassName = config.getString(driverClassOption);
+			if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+				LOG.warn("Could not found driver class name for {}. Please make sure {} is configured.",
+					resourceName, driverClassOption.key());
+				continue;
+			}
+
+			if (externalResourceFactories.containsKey(driverFactoryClassName)) {
+				DelegatingConfiguration delegatingConfiguration =
+					new DelegatingConfiguration(config, ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX));
+				try {
+					externalResourceDrivers.put(resourceName, externalResourceFactories.get(driverFactoryClassName).createExternalResourceDriver(delegatingConfiguration));

Review comment:
       Duplicated access to `externalResourceFactories`.
   Could replace with first `externalResourceFactories.get(driverFactoryClassName)` then check whether the value is null.

##########
File path: flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+	private static final String RESOURCE_NAME = "test";
+	private static final long RESOURCE_VALUE = 1;
+
+	@Test
+	public void testSetResourceInformationIfMethodPresent() {
+		final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithMethod.class.getName(), ResourceInfoWithMethod.class.getName());
+		final ResourceWithMethod resourceWithMethod = new ResourceWithMethod();
+		resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, RESOURCE_NAME, RESOURCE_VALUE);
+
+		assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+		assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), is(RESOURCE_NAME));
+		assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), is(RESOURCE_VALUE));
+	}
+
+	@Test
+	public void testGetResourceInformationIfMethodPresent() {
+		final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithMethod.class.getName(), ResourceInfoWithMethod.class.getName());
+		final ResourceWithMethod resourceWithMethod = new ResourceWithMethod();
+		resourceWithMethod.setResourceInformation(RESOURCE_NAME, ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+		final Map<String, Long> externalResources = resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+		assertThat(externalResources.size(), is(1));
+		assertTrue(externalResources.containsKey(RESOURCE_NAME));
+		assertThat(externalResources.get(RESOURCE_NAME), is(RESOURCE_VALUE));
+	}
+
+	@Test
+	public void testSetResourceInformationIfMethodAbsent() {
+		final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), ResourceInfoWithMethod.class.getName());
+		final ResourceWithMethod resourceWithMethod = new ResourceWithMethod();
+		resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, RESOURCE_NAME, RESOURCE_VALUE);
+
+		assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+	}
+
+	@Test
+	public void testGetResourceInformationIfMethodAbsent() {
+		final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), ResourceInfoWithMethod.class.getName());
+		final ResourceWithMethod resourceWithMethod = new ResourceWithMethod();
+		resourceWithMethod.setResourceInformation(RESOURCE_NAME, ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+		final Map<String, Long> externalResources = resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+		assertThat(externalResources.size(), is(0));
+	}
+
+	@Test
+	public void testSetAndGetExtendedResourcesWithYarnSupport() {
+		assumeTrue((HadoopUtils.isMinHadoopVersion(2, 10) && HadoopUtils.isMaxHadoopVersion(3, 0)) ||
+			HadoopUtils.isMinHadoopVersion(3, 1));
+
+		final Resource resource = Resource.newInstance(100, 1);
+
+		ResourceInformationReflector.INSTANCE.setResourceInformation(resource, RESOURCE_NAME, RESOURCE_VALUE);
+
+		final Map<String, Long> externalResourcesResult = ResourceInformationReflector.INSTANCE.getExternalResources(resource);
+		assertTrue(externalResourcesResult.containsKey(RESOURCE_NAME));
+		assertThat(externalResourcesResult.get(RESOURCE_NAME), is(RESOURCE_VALUE));
+	}

Review comment:
       Maybe another test case for hadoop versions without external resource supports. Just to verify calling the public APIs of the reflector does not lead to failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org