You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:12 UTC

[flink] 10/11: [FLINK-17407] Instatiate and pass ExternalResourceInfoProvider to RuntimeContext

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22112f12b07d20aed43705776cf93fbdc115ed23
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Fri Apr 24 12:06:24 2020 +0800

    [FLINK-17407] Instatiate and pass ExternalResourceInfoProvider to RuntimeContext
---
 .../flink/api/common/functions/RuntimeContext.java |  11 ++
 .../common/functions/util/RuntimeUDFContext.java   |   7 ++
 .../flink/core/plugin/TestingPluginManager.java    |  42 ++++++++
 .../flink/cep/operator/CepRuntimeContext.java      |   7 ++
 .../state/api/runtime/SavepointEnvironment.java    |   8 ++
 .../state/api/runtime/SavepointRuntimeContext.java |   7 ++
 .../flink/runtime/execution/Environment.java       |   8 ++
 .../ExternalResourceInfoProvider.java              |  40 +++++++
 .../externalresource/ExternalResourceUtils.java    | 114 +++++++++++++++++++-
 .../StaticExternalResourceInfoProvider.java        |  53 +++++++++
 .../iterative/task/AbstractIterativeTask.java      |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   2 +
 .../apache/flink/runtime/operators/BatchTask.java  |   2 +-
 .../flink/runtime/operators/DataSinkTask.java      |   2 +-
 .../flink/runtime/operators/DataSourceTask.java    |   3 +-
 .../runtime/operators/chaining/ChainedDriver.java  |   2 +-
 .../util/DistributedRuntimeUDFContext.java         |  16 ++-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   7 ++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  10 ++
 .../runtime/taskmanager/RuntimeEnvironment.java    |  11 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |   9 +-
 .../ExternalResourceUtilsTest.java                 | 120 +++++++++++++++++++++
 .../TestingExternalResourceDriver.java             |  43 ++++++++
 .../TestingExternalResourceDriverFactory.java      |  34 ++++++
 ...TestingFailedExternalResourceDriverFactory.java |  34 ++++++
 .../operators/testutils/DummyEnvironment.java      |   6 ++
 .../operators/testutils/MockEnvironment.java       |  13 ++-
 .../testutils/MockEnvironmentBuilder.java          |  10 +-
 .../TaskExecutorPartitionLifecycleTest.java        |   2 +
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |   2 +
 .../runtime/taskexecutor/TaskExecutorTest.java     |   3 +
 .../taskexecutor/TaskManagerRunnerStartupTest.java |   2 +
 .../TaskSubmissionTestEnvironment.java             |   2 +
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   3 +
 .../runtime/taskmanager/TaskAsyncCallTest.java     |   2 +
 .../flink/runtime/taskmanager/TestTaskBuilder.java |   8 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   2 +
 .../api/functions/async/RichAsyncFunction.java     |   7 ++
 .../api/operators/AbstractStreamOperator.java      |   3 +-
 .../api/operators/AbstractStreamOperatorV2.java    |   3 +-
 .../api/operators/StreamingRuntimeContext.java     |  16 ++-
 .../api/operators/StreamingRuntimeContextTest.java |   4 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |   2 +
 .../runtime/tasks/StreamMockEnvironment.java       |   6 ++
 .../runtime/tasks/StreamTaskTerminationTest.java   |   2 +
 .../runtime/tasks/SynchronousCheckpointITCase.java |   2 +
 .../tasks/TaskCheckpointingBehaviourTest.java      |   2 +
 47 files changed, 684 insertions(+), 19 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index efebf99..18c6e5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
@@ -44,6 +45,7 @@ import org.apache.flink.metrics.MetricGroup;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -176,6 +178,15 @@ public interface RuntimeContext {
 	@PublicEvolving
 	Histogram getHistogram(String name);
 
+	/**
+	 * Get the specific external resource information by the resourceName.
+	 *
+	 * @param resourceName of the required external resource
+	 * @return information set of the external resource identified by the resourceName
+	 */
+	@PublicEvolving
+	Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index bab2eb0..a069e61 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
@@ -30,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 
 /**
@@ -102,6 +104,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 		}
 	}
 
+	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+		throw new UnsupportedOperationException("Do not support external resource in current environment");
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public void setBroadcastVariable(String name, List<?> value) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java b/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java
new file mode 100644
index 0000000..455d55c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link PluginManager} for testing purpose.
+ */
+public class TestingPluginManager implements PluginManager {
+
+	private final Map<Class<?>, Iterator<?>> plugins;
+
+	public TestingPluginManager(Map<Class<?>, Iterator<?>> plugins) {
+		this.plugins = plugins;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <P> Iterator<P> load(Class<P> service) {
+		return (Iterator<P>) plugins.getOrDefault(service, IteratorUtils.emptyIterator());
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index 0518dad..af7ca0c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
@@ -45,6 +46,7 @@ import org.apache.flink.metrics.MetricGroup;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -115,6 +117,11 @@ class CepRuntimeContext implements RuntimeContext {
 		return runtimeContext.getDistributedCache();
 	}
 
+	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName){
+		return runtimeContext.getExternalResourceInfos(resourceName);
+	}
+
 	// -----------------------------------------------------------------------------------
 	// Unsupported operations
 	// -----------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 93f3fbd..fca7a74 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -199,6 +200,13 @@ public class SavepointEnvironment implements Environment {
 	}
 
 	@Override
+	public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+		// We will still construct a StreamingRuntimeContext from this SavepointEnvironment at the moment. So, throwing
+		// Exception here would cause issue. When there is a bounded DataStream API, this class would be removed.
+		return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+	}
+
+	@Override
 	public AccumulatorRegistry getAccumulatorRegistry() {
 		return accumulatorRegistry;
 	}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
index c3002c8..dc25468 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingState;
@@ -50,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A streaming {@link RuntimeContext} which delegates to the underlying batch {@code RuntimeContext}
@@ -162,6 +164,11 @@ public final class SavepointRuntimeContext implements RuntimeContext {
 	}
 
 	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+		throw new UnsupportedOperationException("Do not support external resource in current environment");
+	}
+
+	@Override
 	public boolean hasBroadcastVariable(String name) {
 		return ctx.hasBroadcastVariable(name);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index c29d909..cee6c6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -158,6 +159,13 @@ public interface Environment {
 	GlobalAggregateManager getGlobalAggregateManager();
 
 	/**
+	 * Get the {@link ExternalResourceInfoProvider} which contains infos of available external resources.
+	 *
+	 * @return {@link ExternalResourceInfoProvider} which contains infos of available external resources
+	 */
+	ExternalResourceInfoProvider getExternalResourceInfoProvider();
+
+	/**
 	 * Return the registry for accumulators which are periodically sent to the job manager.
 	 * @return the registry
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java
new file mode 100644
index 0000000..f4f92fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Provide the information of external resources.
+ */
+public interface ExternalResourceInfoProvider {
+
+	ExternalResourceInfoProvider NO_EXTERNAL_RESOURCES = resourceName -> Collections.emptySet();
+
+	/**
+	 * Get the specific external resource information by the resourceName.
+	 *
+	 * @param resourceName of the required external resource
+	 * @return information set of the external resource identified by the resourceName
+	 */
+	Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
index e3d41e0..efa3752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.runtime.externalresource;
 
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -29,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -54,7 +60,12 @@ public class ExternalResourceUtils {
 	}
 
 	/**
-	 * Get the external resources map.
+	 * Get the external resources map. The key should be used for deployment specific container request,
+	 * and values should be the amount of that resource.
+	 *
+	 * @param config Configurations
+	 * @param suffix suffix of config option for deployment specific configuration key
+	 * @return external resources map, map the amount to the configuration key for deployment specific container request
 	 */
 	public static Map<String, Long> getExternalResources(Configuration config, String suffix) {
 		final Set<String> resourceSet = getExternalResourceSet(config);
@@ -98,4 +109,105 @@ public class ExternalResourceUtils {
 
 		return externalResourceConfigs;
 	}
+
+	/**
+	 * Get the map of resource name and amount of all of enabled external resources.
+	 */
+	public static Map<String, Long> getExternalResourceAmountMap(Configuration config) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<Long> amountOption =
+				key(ExternalResourceOptions.getAmountConfigOptionForResource(resourceName))
+					.longType()
+					.noDefaultValue();
+			final Optional<Long> amountOpt = config.getOptional(amountOption);
+			if (!amountOpt.isPresent()) {
+				LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName);
+			} else if (amountOpt.get() <= 0) {
+				LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName);
+			} else {
+				externalResourceAmountMap.put(resourceName, amountOpt.get());
+			}
+		}
+
+		return externalResourceAmountMap;
+	}
+
+	/**
+	 * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers}
+	 * are mapped to its resource name.
+	 */
+	public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) {
+		final Set<String> resourceSet = getExternalResourceSet(config);
+		LOG.info("Enabled external resources: {}", resourceSet);
+
+		if (resourceSet.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class);
+		final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>();
+		factoryIterator.forEachRemaining(
+			externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory));
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+		for (String resourceName: resourceSet) {
+			final ConfigOption<String> driverClassOption =
+				key(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(resourceName))
+					.stringType()
+					.noDefaultValue();
+			final String driverFactoryClassName = config.getString(driverClassOption);
+			if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+				LOG.warn("Could not find driver class name for {}. Please make sure {} is configured.",
+					resourceName, driverClassOption.key());
+				continue;
+			}
+
+			ExternalResourceDriverFactory externalResourceDriverFactory = externalResourceFactories.get(driverFactoryClassName);
+			if (externalResourceDriverFactory != null) {
+				DelegatingConfiguration delegatingConfiguration =
+					new DelegatingConfiguration(config, ExternalResourceOptions.getExternalResourceParamConfigPrefixForResource(resourceName));
+				try {
+					externalResourceDrivers.put(resourceName, externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration));
+					LOG.info("Add external resources driver for {}.", resourceName);
+				} catch (Exception e) {
+					LOG.warn("Could not instantiate driver with factory {} for {}. {}", driverFactoryClassName, resourceName, e);
+				}
+			} else {
+				LOG.warn("Could not find factory class {} for {}.", driverFactoryClassName, resourceName);
+			}
+		}
+
+		return externalResourceDrivers;
+	}
+
+	/**
+	 * Instantiate {@link StaticExternalResourceInfoProvider} for all of enabled external resources.
+	 */
+	public static ExternalResourceInfoProvider createStaticExternalResourceInfoProvider(Map<String, Long> externalResourceAmountMap, Map<String, ExternalResourceDriver> externalResourceDrivers) {
+		final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>();
+		for (Map.Entry<String, ExternalResourceDriver> externalResourceDriverEntry : externalResourceDrivers.entrySet()) {
+			final String resourceName = externalResourceDriverEntry.getKey();
+			final ExternalResourceDriver externalResourceDriver = externalResourceDriverEntry.getValue();
+			if (externalResourceAmountMap.containsKey(resourceName)) {
+				try {
+					final Set<? extends ExternalResourceInfo> externalResourceInfos;
+					externalResourceInfos = externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName));
+					externalResources.put(resourceName, externalResourceInfos);
+				} catch (Exception e) {
+					LOG.warn("Failed to retrieve information of external resource {}.", resourceName, e);
+				}
+			} else {
+				LOG.warn("Could not found legal amount configuration for {}.", resourceName);
+			}
+		}
+		return new StaticExternalResourceInfoProvider(externalResources);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java
new file mode 100644
index 0000000..d124c5a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Static implementation of {@link ExternalResourceInfoProvider} which return fixed collection
+ * of {@link ExternalResourceInfo}.
+ */
+public class StaticExternalResourceInfoProvider implements ExternalResourceInfoProvider {
+
+	private final Map<String, Set<? extends ExternalResourceInfo>> externalResources;
+
+	public StaticExternalResourceInfoProvider(Map<String, Set<? extends ExternalResourceInfo>> externalResources) {
+		this.externalResources = externalResources;
+	}
+
+	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+		if (!externalResources.containsKey(resourceName)) {
+			return Collections.emptySet();
+		}
+
+		return Collections.unmodifiableSet(externalResources.get(resourceName));
+	}
+
+	@VisibleForTesting
+	Map<String, Set<? extends ExternalResourceInfo>> getExternalResources() {
+		return externalResources;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 19eb3d1..edaa924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -183,8 +184,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	@Override
 	public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
 		Environment env = getEnvironment();
+
 		return new IterativeRuntimeUdfContext(env.getTaskInfo(), getUserCodeClassLoader(),
-				getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+				getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics, env.getExternalResourceInfoProvider());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -375,8 +377,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
 		public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
-											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
-			super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
+											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap,
+											MetricGroup metrics, ExternalResourceInfoProvider externalResourceInfoProvider) {
+			super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics, externalResourceInfoProvider);
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 75e4740..44e0ad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceMa
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -536,6 +537,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				metricRegistry,
 				blobCacheService,
 				useLocalCommunication(),
+				ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 				taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
 
 			taskExecutor.start();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index d22d472..a51f376 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1035,7 +1035,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 		Environment env = getEnvironment();
 
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
-				getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+				getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics, env.getExternalResourceInfoProvider());
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index f2dc9bc..3c80d8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -415,6 +415,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
+				getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()), env.getExternalResourceInfoProvider());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index ba1bb08..6b8dd6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -404,8 +404,9 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 
 		String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
 		sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
+
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
+				getEnvironment().getMetricGroup().getOrAddOperator(sourceName), env.getExternalResourceInfoProvider());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 7dd3a14..f2a782e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -80,7 +80,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 			this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
 		} else {
 			this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
-					parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
+					parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics, env.getExternalResourceInfoProvider()
 			);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index 35c7bab..3c3fcbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.operators.util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -33,6 +35,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.broadcast.InitializationTypeConflictException;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -41,10 +44,14 @@ import org.apache.flink.util.Preconditions;
 public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
 
 	private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
-	
+
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
 	public DistributedRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
-											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators, MetricGroup metrics) {
+										Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
+										MetricGroup metrics, ExternalResourceInfoProvider externalResourceInfoProvider) {
 		super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics);
+		this.externalResourceInfoProvider = Preconditions.checkNotNull(externalResourceInfoProvider);
 	}
 
 	@Override
@@ -87,6 +94,11 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
 			throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
 		}
 	}
+
+	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+		return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 59d72ff..d74532f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
@@ -202,6 +203,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	/** The state manager for this task, providing state managers per slot. */
 	private final TaskExecutorLocalStateStoresManager localStateStoresManager;
 
+	/** Information provider for external resources. */
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
 	/** The network component in the task manager. */
 	private final ShuffleEnvironment<?, ?> shuffleEnvironment;
 
@@ -257,6 +261,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			TaskManagerConfiguration taskManagerConfiguration,
 			HighAvailabilityServices haServices,
 			TaskManagerServices taskExecutorServices,
+			ExternalResourceInfoProvider externalResourceInfoProvider,
 			HeartbeatServices heartbeatServices,
 			TaskManagerMetricGroup taskManagerMetricGroup,
 			@Nullable String metricQueryServiceAddress,
@@ -278,6 +283,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		this.blobCacheService = checkNotNull(blobCacheService);
 		this.metricQueryServiceAddress = metricQueryServiceAddress;
 		this.backPressureSampleService = checkNotNull(backPressureSampleService);
+		this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
 
 		this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
 		this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
@@ -620,6 +626,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				taskExecutorServices.getKvStateService(),
 				taskExecutorServices.getBroadcastVariableManager(),
 				taskExecutorServices.getTaskEventDispatcher(),
+				externalResourceInfoProvider,
 				taskStateManager,
 				taskManagerActions,
 				inputSplitProvider,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f4add3b..88786bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -145,6 +147,11 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			configuration, highAvailabilityServices.createBlobStore(), null
 		);
 
+		final ExternalResourceInfoProvider externalResourceInfoProvider =
+			ExternalResourceUtils.createStaticExternalResourceInfoProvider(
+				ExternalResourceUtils.getExternalResourceAmountMap(configuration),
+				ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
+
 		taskManager = startTaskManager(
 			this.configuration,
 			this.resourceId,
@@ -154,6 +161,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			metricRegistry,
 			blobCacheService,
 			false,
+			externalResourceInfoProvider,
 			this);
 
 		this.terminationFuture = new CompletableFuture<>();
@@ -330,6 +338,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			MetricRegistry metricRegistry,
 			BlobCacheService blobCacheService,
 			boolean localCommunicationOnly,
+			ExternalResourceInfoProvider externalResourceInfoProvider,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 
 		checkNotNull(configuration);
@@ -373,6 +382,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			taskManagerConfiguration,
 			highAvailabilityServices,
 			taskManagerServices,
+			externalResourceInfoProvider,
 			heartbeatServices,
 			taskManagerMetricGroup.f0,
 			metricQueryServiceAddress,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index cd04bd3..ca79f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -70,6 +71,7 @@ public class RuntimeEnvironment implements Environment {
 	private final TaskStateManager taskStateManager;
 	private final GlobalAggregateManager aggregateManager;
 	private final InputSplitProvider splitProvider;
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
 
 	private final Map<String, Future<Path>> distCacheEntries;
 
@@ -117,7 +119,8 @@ public class RuntimeEnvironment implements Environment {
 			TaskOperatorEventGateway operatorEventGateway,
 			TaskManagerRuntimeInfo taskManagerInfo,
 			TaskMetricGroup metrics,
-			Task containingTask) {
+			Task containingTask,
+			ExternalResourceInfoProvider externalResourceInfoProvider) {
 
 		this.jobId = checkNotNull(jobId);
 		this.jobVertexId = checkNotNull(jobVertexId);
@@ -144,6 +147,7 @@ public class RuntimeEnvironment implements Environment {
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 		this.containingTask = containingTask;
 		this.metrics = metrics;
+		this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
 	}
 
 	// ------------------------------------------------------------------------
@@ -269,6 +273,11 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+		return externalResourceInfoProvider;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
 		acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dad0b68..2e3c9b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -193,6 +194,9 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 
 	private final TaskEventDispatcher taskEventDispatcher;
 
+	/** Information provider for external resources. */
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
 	/** The manager for state of operators running in this task/slot. */
 	private final TaskStateManager taskStateManager;
 
@@ -294,6 +298,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 		KvStateService kvStateService,
 		BroadcastVariableManager bcVarManager,
 		TaskEventDispatcher taskEventDispatcher,
+		ExternalResourceInfoProvider externalResourceInfoProvider,
 		TaskStateManager taskStateManager,
 		TaskManagerActions taskManagerActions,
 		InputSplitProvider inputSplitProvider,
@@ -351,6 +356,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 		this.operatorCoordinatorEventGateway = Preconditions.checkNotNull(operatorCoordinatorEventGateway);
 		this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
 		this.taskManagerActions = checkNotNull(taskManagerActions);
+		this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
 
 		this.classLoaderHandle = Preconditions.checkNotNull(classLoaderHandle);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
@@ -680,7 +686,8 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 				operatorCoordinatorEventGateway,
 				taskManagerConfig,
 				metrics,
-				this);
+				this,
+				externalResourceInfoProvider);
 
 			// Make sure the user code classloader is accessible thread-locally.
 			// We are setting the correct context class loader before instantiating the invokable
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
index 707bd4b..fb51c29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
@@ -18,18 +18,28 @@
 
 package org.apache.flink.runtime.externalresource;
 
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.commons.collections.IteratorUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -123,4 +133,114 @@ public class ExternalResourceUtilsTest extends TestLogger {
 		assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1));
 		assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2));
 	}
+
+	@Test
+	public void testConstructExternalResourceDriversFromConfig() {
+		final Configuration config = new Configuration();
+		final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName();
+		final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+		plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+		final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+		config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+		assertThat(externalResourceDrivers.size(), is(1));
+		assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class));
+	}
+
+	@Test
+	public void testNotConfiguredFactoryClass() {
+		final Configuration config = new Configuration();
+		final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+		plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+		final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+		assertThat(externalResourceDrivers.entrySet(), is(empty()));
+	}
+
+	@Test
+	public void testFactoryPluginDoesNotExist() {
+		final Configuration config = new Configuration();
+		final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName();
+		final PluginManager testingPluginManager = new TestingPluginManager(Collections.emptyMap());
+
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+		config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+		assertThat(externalResourceDrivers.entrySet(), is(empty()));
+	}
+
+	@Test
+	public void testFactoryFailedToCreateDriver() {
+		final Configuration config = new Configuration();
+		final String driverFactoryClassName = TestingFailedExternalResourceDriverFactory.class.getName();
+		final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+		plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingFailedExternalResourceDriverFactory()));
+		final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+		config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+		assertThat(externalResourceDrivers.entrySet(), is(empty()));
+	}
+
+	@Test
+	public void testGetExternalResourceInfoProvider() {
+		final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+		externalResourceAmountMap.put(RESOURCE_NAME_1, RESOURCE_AMOUNT_1);
+		externalResourceDrivers.put(RESOURCE_NAME_1, new TestingExternalResourceDriver());
+
+		final StaticExternalResourceInfoProvider externalResourceInfoProvider =
+			(StaticExternalResourceInfoProvider) ExternalResourceUtils.createStaticExternalResourceInfoProvider(externalResourceAmountMap, externalResourceDrivers);
+
+		assertNotNull(externalResourceInfoProvider.getExternalResources().get(RESOURCE_NAME_1));
+	}
+
+	@Test
+	public void testGetExternalResourceInfoProviderWithoutAmount() {
+		final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+		final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+		externalResourceDrivers.put(RESOURCE_NAME_1, new TestingExternalResourceDriver());
+
+		final StaticExternalResourceInfoProvider externalResourceInfoProvider =
+			(StaticExternalResourceInfoProvider) ExternalResourceUtils.createStaticExternalResourceInfoProvider(externalResourceAmountMap, externalResourceDrivers);
+
+		assertThat(externalResourceInfoProvider.getExternalResources().entrySet(), is(empty()));
+	}
+
+	@Test
+	public void testGetExternalResourceAmountMap() {
+		final Configuration config = new Configuration();
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+		config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), RESOURCE_AMOUNT_1);
+
+		final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config);
+
+		assertThat(externalResourceAmountMap.size(), is(1));
+		assertTrue(externalResourceAmountMap.containsKey(RESOURCE_NAME_1));
+		assertThat(externalResourceAmountMap.get(RESOURCE_NAME_1), is(RESOURCE_AMOUNT_1));
+	}
+
+	@Test
+	public void testGetExternalResourceAmountMapWithIllegalAmount() {
+		final Configuration config = new Configuration();
+		config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+		config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), 0);
+
+		final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config);
+
+		assertThat(externalResourceAmountMap.entrySet(), is(empty()));
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
new file mode 100644
index 0000000..ce6387b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * No-op {@link ExternalResourceDriver} for testing purpose.
+ */
+public class TestingExternalResourceDriver implements ExternalResourceDriver {
+
+	private int callTimes = 0;
+
+	@Override
+	public Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) {
+		callTimes += 1;
+		return Collections.emptySet();
+	}
+
+	public int getCallTimes() {
+		return callTimes;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java
new file mode 100644
index 0000000..d08e614
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Implementation of {@link ExternalResourceDriverFactory} for testing purpose.
+ */
+public class TestingExternalResourceDriverFactory implements ExternalResourceDriverFactory {
+
+	@Override
+	public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
+		return new TestingExternalResourceDriver();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java
new file mode 100644
index 0000000..449c310
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Implementation of {@link ExternalResourceDriverFactory} for testing purpose which fails to create an {@link ExternalResourceDriver}.
+ */
+public class TestingFailedExternalResourceDriverFactory implements ExternalResourceDriverFactory {
+
+	@Override
+	public ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception {
+		throw new Exception();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index d233d79..15b6d07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -197,6 +198,11 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
+	public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+		return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2e85a4f..8f97a5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.RecordCollectingResultPartitionWriter;
@@ -119,6 +120,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private final TaskMetricGroup taskMetricGroup;
 
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
 	public static MockEnvironmentBuilder builder() {
 		return new MockEnvironmentBuilder();
 	}
@@ -140,7 +143,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 			ClassLoader userCodeClassLoader,
 			TaskMetricGroup taskMetricGroup,
 			TaskManagerRuntimeInfo taskManagerRuntimeInfo,
-			MemoryManager memManager) {
+			MemoryManager memManager,
+			ExternalResourceInfoProvider externalResourceInfoProvider) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -169,6 +173,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
 
 		this.taskMetricGroup = taskMetricGroup;
+
+		this.externalResourceInfoProvider = Preconditions.checkNotNull(externalResourceInfoProvider);
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -330,6 +336,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 	}
 
 	@Override
+	public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+		return externalResourceInfoProvider;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
 		throw new UnsupportedOperationException();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index d421015..bedd3b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,6 +55,7 @@ public class MockEnvironmentBuilder {
 	private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 	private IOManager ioManager;
 	private MemoryManager memoryManager = buildMemoryManager(1024 * MemoryManager.DEFAULT_PAGE_SIZE);
+	private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
 
 	private MemoryManager buildMemoryManager(long memorySize) {
 		return MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, memorySize).build();
@@ -144,6 +146,11 @@ public class MockEnvironmentBuilder {
 		return this;
 	}
 
+	public MockEnvironmentBuilder setExternalResourceInfoProvider(ExternalResourceInfoProvider externalResourceInfoProvider) {
+		this.externalResourceInfoProvider = externalResourceInfoProvider;
+		return this;
+	}
+
 	public MockEnvironment build() {
 		if (ioManager == null) {
 			ioManager = new IOManagerAsync();
@@ -165,6 +172,7 @@ public class MockEnvironmentBuilder {
 			userCodeClassLoader,
 			taskMetricGroup,
 			taskManagerRuntimeInfo,
-			memoryManager);
+			memoryManager,
+			externalResourceInfoProvider);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 58bbfcf..9cc5d69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -458,6 +459,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
 				InetAddress.getLoopbackAddress().getHostAddress()),
 			haServices,
 			taskManagerServices,
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			new HeartbeatServices(10_000L, 30_000L),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 097ba56..b5d0592 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
@@ -189,6 +190,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
 				.setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1))
 				.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
 				.build(),
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			new TestingHeartbeatServices(),
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4a23726..5d0e165 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -1867,6 +1868,7 @@ public class TaskExecutorTest extends TestLogger {
 				InetAddress.getLoopbackAddress().getHostAddress()),
 			haServices,
 			taskManagerServices,
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			null,
@@ -1889,6 +1891,7 @@ public class TaskExecutorTest extends TestLogger {
 				InetAddress.getLoopbackAddress().getHostAddress()),
 			haServices,
 			taskManagerServices,
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 8d5fadf..d0f2241 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -182,6 +183,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 				new VoidBlobStore(),
 				null),
 			false,
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			error -> {});
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 927622c..824e0f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -220,6 +221,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 				InetAddress.getLoopbackAddress().getHostAddress()),
 			haServices,
 			taskManagerServices,
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			heartbeatServices,
 			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
 			metricQueryServiceAddress,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 4556c40..7a4a01f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
@@ -41,6 +42,7 @@ class TestingTaskExecutor extends TaskExecutor {
 			TaskManagerConfiguration taskManagerConfiguration,
 			HighAvailabilityServices haServices,
 			TaskManagerServices taskExecutorServices,
+			ExternalResourceInfoProvider externalResourceInfoProvider,
 			HeartbeatServices heartbeatServices,
 			TaskManagerMetricGroup taskManagerMetricGroup,
 			@Nullable String metricQueryServiceAddress,
@@ -53,6 +55,7 @@ class TestingTaskExecutor extends TaskExecutor {
 			taskManagerConfiguration,
 			haServices,
 			taskExecutorServices,
+			externalResourceInfoProvider,
 			heartbeatServices,
 			taskManagerMetricGroup,
 			metricQueryServiceAddress,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3858f34..ef77500 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -198,6 +199,7 @@ public class TaskAsyncCallTest extends TestLogger {
 			new KvStateService(new KvStateRegistry(), null, null),
 			mock(BroadcastVariableManager.class),
 			new TaskEventDispatcher(),
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			new TestTaskStateManager(),
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 3183f01..2decf57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -85,6 +86,7 @@ public final class TestTaskBuilder {
 	private JobID jobId = new JobID();
 	private AllocationID allocationID = new AllocationID();
 	private ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+	private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
 
 	public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
 		this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
@@ -170,6 +172,11 @@ public final class TestTaskBuilder {
 		return this;
 	}
 
+	public TestTaskBuilder setExternalResourceInfoProvider(ExternalResourceInfoProvider externalResourceInfoProvider) {
+		this.externalResourceInfoProvider = externalResourceInfoProvider;
+		return this;
+	}
+
 	public Task build() throws Exception {
 		final JobVertexID jobVertexId = new JobVertexID();
 
@@ -209,6 +216,7 @@ public final class TestTaskBuilder {
 			kvStateService,
 			new BroadcastVariableManager(),
 			new TaskEventDispatcher(),
+			externalResourceInfoProvider,
 			new TestTaskStateManager(),
 			taskManagerActions,
 			new MockInputSplitProvider(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 1eaf46b..eadb9d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -208,6 +209,7 @@ public class JvmExitOnFatalErrorTest {
 						new KvStateService(new KvStateRegistry(), null, null),
 						new BroadcastVariableManager(),
 						new TaskEventDispatcher(),
+						ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 						slotStateManager,
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 8d9ac8f..7b4ac6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
@@ -51,6 +52,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the
@@ -149,6 +151,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
 			return runtimeContext.getUserCodeClassLoader();
 		}
 
+		@Override
+		public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+			return runtimeContext.getExternalResourceInfos(resourceName);
+		}
+
 		// -----------------------------------------------------------------------------------
 		// Unsupported operations
 		// -----------------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 8a4d215..a698254 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -212,7 +212,8 @@ public abstract class AbstractStreamOperator<OUT>
 			getMetricGroup(),
 			getOperatorID(),
 			getProcessingTimeService(),
-			null);
+			null,
+			environment.getExternalResourceInfoProvider());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
 		stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 9f1fa89..56533db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -143,7 +143,8 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU
 			operatorMetricGroup,
 			getOperatorID(),
 			processingTimeService,
-			null);
+			null,
+			environment.getExternalResourceInfoProvider());
 	}
 
 	private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 65b54a7..e451563 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
 import org.apache.flink.api.common.state.AggregatingState;
@@ -39,6 +40,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
@@ -51,6 +53,7 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -67,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	private final String operatorUniqueID;
 	private final ProcessingTimeService processingTimeService;
 	private @Nullable KeyedStateStore keyedStateStore;
+	private final ExternalResourceInfoProvider externalResourceInfoProvider;
 
 	@VisibleForTesting
 	public StreamingRuntimeContext(
@@ -79,7 +83,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 			operator.getMetricGroup(),
 			operator.getOperatorID(),
 			operator.getProcessingTimeService(),
-			operator.getKeyedStateStore());
+			operator.getKeyedStateStore(),
+			env.getExternalResourceInfoProvider());
 	}
 
 	public StreamingRuntimeContext(
@@ -88,7 +93,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 			MetricGroup operatorMetricGroup,
 			OperatorID operatorID,
 			ProcessingTimeService processingTimeService,
-			@Nullable KeyedStateStore keyedStateStore) {
+			@Nullable KeyedStateStore keyedStateStore,
+			ExternalResourceInfoProvider externalResourceInfoProvider) {
 		super(checkNotNull(env).getTaskInfo(),
 				env.getUserClassLoader(),
 				env.getExecutionConfig(),
@@ -100,6 +106,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		this.operatorUniqueID = checkNotNull(operatorID).toString();
 		this.processingTimeService = processingTimeService;
 		this.keyedStateStore = keyedStateStore;
+		this.externalResourceInfoProvider = externalResourceInfoProvider;
 	}
 
 	public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) {
@@ -150,6 +157,11 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return taskEnvironment.getTaskManagerInfo();
 	}
 
+	@Override
+	public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+		return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
+	}
+
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 1c2390c..fff4ef3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -290,7 +291,8 @@ public class StreamingRuntimeContextTest {
 			operator.getMetricGroup(),
 			operator.getOperatorID(),
 			operator.getProcessingTimeService(),
-			operator.getKeyedStateStore());
+			operator.getKeyedStateStore(),
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 35a4936..d700920 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -269,6 +270,7 @@ public class InterruptSensitiveRestoreTest {
 			new KvStateService(new KvStateRegistry(), null, null),
 			mock(BroadcastVariableManager.class),
 			new TaskEventDispatcher(),
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			taskStateManager,
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0092f02..01770f8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -320,6 +321,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+		return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+	}
+
+	@Override
 	public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index cda5d02..4d9690c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -172,6 +173,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 			new KvStateService(new KvStateRegistry(), null, null),
 			mock(BroadcastVariableManager.class),
 			new TaskEventDispatcher(),
+			ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 			new TestTaskStateManager(),
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 05eec10..ca5ee61 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -247,6 +248,7 @@ public class SynchronousCheckpointITCase {
 				new KvStateService(new KvStateRegistry(), null, null),
 				mock(BroadcastVariableManager.class),
 				new TaskEventDispatcher(),
+				ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 				new TestTaskStateManager(),
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 9619bea..4d8f3a7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -213,6 +214,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				new KvStateService(new KvStateRegistry(), null, null),
 				mock(BroadcastVariableManager.class),
 				new TaskEventDispatcher(),
+				ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
 				new TestTaskStateManager(),
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),