You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:12 UTC
[flink] 10/11: [FLINK-17407] Instatiate and pass
ExternalResourceInfoProvider to RuntimeContext
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22112f12b07d20aed43705776cf93fbdc115ed23
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Fri Apr 24 12:06:24 2020 +0800
[FLINK-17407] Instatiate and pass ExternalResourceInfoProvider to RuntimeContext
---
.../flink/api/common/functions/RuntimeContext.java | 11 ++
.../common/functions/util/RuntimeUDFContext.java | 7 ++
.../flink/core/plugin/TestingPluginManager.java | 42 ++++++++
.../flink/cep/operator/CepRuntimeContext.java | 7 ++
.../state/api/runtime/SavepointEnvironment.java | 8 ++
.../state/api/runtime/SavepointRuntimeContext.java | 7 ++
.../flink/runtime/execution/Environment.java | 8 ++
.../ExternalResourceInfoProvider.java | 40 +++++++
.../externalresource/ExternalResourceUtils.java | 114 +++++++++++++++++++-
.../StaticExternalResourceInfoProvider.java | 53 +++++++++
.../iterative/task/AbstractIterativeTask.java | 9 +-
.../flink/runtime/minicluster/MiniCluster.java | 2 +
.../apache/flink/runtime/operators/BatchTask.java | 2 +-
.../flink/runtime/operators/DataSinkTask.java | 2 +-
.../flink/runtime/operators/DataSourceTask.java | 3 +-
.../runtime/operators/chaining/ChainedDriver.java | 2 +-
.../util/DistributedRuntimeUDFContext.java | 16 ++-
.../flink/runtime/taskexecutor/TaskExecutor.java | 7 ++
.../runtime/taskexecutor/TaskManagerRunner.java | 10 ++
.../runtime/taskmanager/RuntimeEnvironment.java | 11 +-
.../org/apache/flink/runtime/taskmanager/Task.java | 9 +-
.../ExternalResourceUtilsTest.java | 120 +++++++++++++++++++++
.../TestingExternalResourceDriver.java | 43 ++++++++
.../TestingExternalResourceDriverFactory.java | 34 ++++++
...TestingFailedExternalResourceDriverFactory.java | 34 ++++++
.../operators/testutils/DummyEnvironment.java | 6 ++
.../operators/testutils/MockEnvironment.java | 13 ++-
.../testutils/MockEnvironmentBuilder.java | 10 +-
.../TaskExecutorPartitionLifecycleTest.java | 2 +
.../taskexecutor/TaskExecutorSlotLifetimeTest.java | 2 +
.../runtime/taskexecutor/TaskExecutorTest.java | 3 +
.../taskexecutor/TaskManagerRunnerStartupTest.java | 2 +
.../TaskSubmissionTestEnvironment.java | 2 +
.../runtime/taskexecutor/TestingTaskExecutor.java | 3 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 2 +
.../flink/runtime/taskmanager/TestTaskBuilder.java | 8 ++
.../runtime/util/JvmExitOnFatalErrorTest.java | 2 +
.../api/functions/async/RichAsyncFunction.java | 7 ++
.../api/operators/AbstractStreamOperator.java | 3 +-
.../api/operators/AbstractStreamOperatorV2.java | 3 +-
.../api/operators/StreamingRuntimeContext.java | 16 ++-
.../api/operators/StreamingRuntimeContextTest.java | 4 +-
.../tasks/InterruptSensitiveRestoreTest.java | 2 +
.../runtime/tasks/StreamMockEnvironment.java | 6 ++
.../runtime/tasks/StreamTaskTerminationTest.java | 2 +
.../runtime/tasks/SynchronousCheckpointITCase.java | 2 +
.../tasks/TaskCheckpointingBehaviourTest.java | 2 +
47 files changed, 684 insertions(+), 19 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index efebf99..18c6e5a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
@@ -44,6 +45,7 @@ import org.apache.flink.metrics.MetricGroup;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -176,6 +178,15 @@ public interface RuntimeContext {
@PublicEvolving
Histogram getHistogram(String name);
+ /**
+ * Get the specific external resource information by the resourceName.
+ *
+ * @param resourceName of the required external resource
+ * @return information set of the external resource identified by the resourceName
+ */
+ @PublicEvolving
+ Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
+
// --------------------------------------------------------------------------------------------
/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index bab2eb0..a069e61 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
@@ -30,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Future;
/**
@@ -102,6 +104,11 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
}
}
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ throw new UnsupportedOperationException("Do not support external resource in current environment");
+ }
+
// --------------------------------------------------------------------------------------------
public void setBroadcastVariable(String name, List<?> value) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java b/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java
new file mode 100644
index 0000000..455d55c
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/plugin/TestingPluginManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link PluginManager} for testing purpose.
+ */
+public class TestingPluginManager implements PluginManager {
+
+ private final Map<Class<?>, Iterator<?>> plugins;
+
+ public TestingPluginManager(Map<Class<?>, Iterator<?>> plugins) {
+ this.plugins = plugins;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <P> Iterator<P> load(Class<P> service) {
+ return (Iterator<P>) plugins.getOrDefault(service, IteratorUtils.emptyIterator());
+ }
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index 0518dad..af7ca0c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
@@ -45,6 +46,7 @@ import org.apache.flink.metrics.MetricGroup;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -115,6 +117,11 @@ class CepRuntimeContext implements RuntimeContext {
return runtimeContext.getDistributedCache();
}
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName){
+ return runtimeContext.getExternalResourceInfos(resourceName);
+ }
+
// -----------------------------------------------------------------------------------
// Unsupported operations
// -----------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 93f3fbd..fca7a74 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -199,6 +200,13 @@ public class SavepointEnvironment implements Environment {
}
@Override
+ public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+ // We will still construct a StreamingRuntimeContext from this SavepointEnvironment at the moment. So, throwing
+ // Exception here would cause issue. When there is a bounded DataStream API, this class would be removed.
+ return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+ }
+
+ @Override
public AccumulatorRegistry getAccumulatorRegistry() {
return accumulatorRegistry;
}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
index c3002c8..dc25468 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
@@ -50,6 +51,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A streaming {@link RuntimeContext} which delegates to the underlying batch {@code RuntimeContext}
@@ -162,6 +164,11 @@ public final class SavepointRuntimeContext implements RuntimeContext {
}
@Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ throw new UnsupportedOperationException("Do not support external resource in current environment");
+ }
+
+ @Override
public boolean hasBroadcastVariable(String name) {
return ctx.hasBroadcastVariable(name);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index c29d909..cee6c6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -158,6 +159,13 @@ public interface Environment {
GlobalAggregateManager getGlobalAggregateManager();
/**
+ * Get the {@link ExternalResourceInfoProvider} which contains infos of available external resources.
+ *
+ * @return {@link ExternalResourceInfoProvider} which contains infos of available external resources
+ */
+ ExternalResourceInfoProvider getExternalResourceInfoProvider();
+
+ /**
* Return the registry for accumulators which are periodically sent to the job manager.
* @return the registry
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java
new file mode 100644
index 0000000..f4f92fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Provide the information of external resources.
+ */
+public interface ExternalResourceInfoProvider {
+
+ ExternalResourceInfoProvider NO_EXTERNAL_RESOURCES = resourceName -> Collections.emptySet();
+
+ /**
+ * Get the specific external resource information by the resourceName.
+ *
+ * @param resourceName of the required external resource
+ * @return information set of the external resource identified by the resourceName
+ */
+ Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
index e3d41e0..efa3752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
@@ -18,9 +18,14 @@
package org.apache.flink.runtime.externalresource;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -29,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -54,7 +60,12 @@ public class ExternalResourceUtils {
}
/**
- * Get the external resources map.
+ * Get the external resources map. The key should be used for deployment specific container request,
+ * and values should be the amount of that resource.
+ *
+ * @param config Configurations
+ * @param suffix suffix of config option for deployment specific configuration key
+ * @return external resources map, map the amount to the configuration key for deployment specific container request
*/
public static Map<String, Long> getExternalResources(Configuration config, String suffix) {
final Set<String> resourceSet = getExternalResourceSet(config);
@@ -98,4 +109,105 @@ public class ExternalResourceUtils {
return externalResourceConfigs;
}
+
+ /**
+ * Get the map of resource name and amount of all of enabled external resources.
+ */
+ public static Map<String, Long> getExternalResourceAmountMap(Configuration config) {
+ final Set<String> resourceSet = getExternalResourceSet(config);
+ LOG.info("Enabled external resources: {}", resourceSet);
+
+ if (resourceSet.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+ for (String resourceName: resourceSet) {
+ final ConfigOption<Long> amountOption =
+ key(ExternalResourceOptions.getAmountConfigOptionForResource(resourceName))
+ .longType()
+ .noDefaultValue();
+ final Optional<Long> amountOpt = config.getOptional(amountOption);
+ if (!amountOpt.isPresent()) {
+ LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName);
+ } else if (amountOpt.get() <= 0) {
+ LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName);
+ } else {
+ externalResourceAmountMap.put(resourceName, amountOpt.get());
+ }
+ }
+
+ return externalResourceAmountMap;
+ }
+
+ /**
+ * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers}
+ * are mapped to its resource name.
+ */
+ public static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) {
+ final Set<String> resourceSet = getExternalResourceSet(config);
+ LOG.info("Enabled external resources: {}", resourceSet);
+
+ if (resourceSet.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class);
+ final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>();
+ factoryIterator.forEachRemaining(
+ externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory));
+
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+ for (String resourceName: resourceSet) {
+ final ConfigOption<String> driverClassOption =
+ key(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(resourceName))
+ .stringType()
+ .noDefaultValue();
+ final String driverFactoryClassName = config.getString(driverClassOption);
+ if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+ LOG.warn("Could not find driver class name for {}. Please make sure {} is configured.",
+ resourceName, driverClassOption.key());
+ continue;
+ }
+
+ ExternalResourceDriverFactory externalResourceDriverFactory = externalResourceFactories.get(driverFactoryClassName);
+ if (externalResourceDriverFactory != null) {
+ DelegatingConfiguration delegatingConfiguration =
+ new DelegatingConfiguration(config, ExternalResourceOptions.getExternalResourceParamConfigPrefixForResource(resourceName));
+ try {
+ externalResourceDrivers.put(resourceName, externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration));
+ LOG.info("Add external resources driver for {}.", resourceName);
+ } catch (Exception e) {
+ LOG.warn("Could not instantiate driver with factory {} for {}. {}", driverFactoryClassName, resourceName, e);
+ }
+ } else {
+ LOG.warn("Could not find factory class {} for {}.", driverFactoryClassName, resourceName);
+ }
+ }
+
+ return externalResourceDrivers;
+ }
+
+ /**
+ * Instantiate {@link StaticExternalResourceInfoProvider} for all of enabled external resources.
+ */
+ public static ExternalResourceInfoProvider createStaticExternalResourceInfoProvider(Map<String, Long> externalResourceAmountMap, Map<String, ExternalResourceDriver> externalResourceDrivers) {
+ final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>();
+ for (Map.Entry<String, ExternalResourceDriver> externalResourceDriverEntry : externalResourceDrivers.entrySet()) {
+ final String resourceName = externalResourceDriverEntry.getKey();
+ final ExternalResourceDriver externalResourceDriver = externalResourceDriverEntry.getValue();
+ if (externalResourceAmountMap.containsKey(resourceName)) {
+ try {
+ final Set<? extends ExternalResourceInfo> externalResourceInfos;
+ externalResourceInfos = externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName));
+ externalResources.put(resourceName, externalResourceInfos);
+ } catch (Exception e) {
+ LOG.warn("Failed to retrieve information of external resource {}.", resourceName, e);
+ }
+ } else {
+ LOG.warn("Could not found legal amount configuration for {}.", resourceName);
+ }
+ }
+ return new StaticExternalResourceInfoProvider(externalResources);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java
new file mode 100644
index 0000000..d124c5a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/StaticExternalResourceInfoProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Static implementation of {@link ExternalResourceInfoProvider} which return fixed collection
+ * of {@link ExternalResourceInfo}.
+ */
+public class StaticExternalResourceInfoProvider implements ExternalResourceInfoProvider {
+
+ private final Map<String, Set<? extends ExternalResourceInfo>> externalResources;
+
+ public StaticExternalResourceInfoProvider(Map<String, Set<? extends ExternalResourceInfo>> externalResources) {
+ this.externalResources = externalResources;
+ }
+
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ if (!externalResources.containsKey(resourceName)) {
+ return Collections.emptySet();
+ }
+
+ return Collections.unmodifiableSet(externalResources.get(resourceName));
+ }
+
+ @VisibleForTesting
+ Map<String, Set<? extends ExternalResourceInfo>> getExternalResources() {
+ return externalResources;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 19eb3d1..edaa924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -183,8 +184,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
@Override
public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
Environment env = getEnvironment();
+
return new IterativeRuntimeUdfContext(env.getTaskInfo(), getUserCodeClassLoader(),
- getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+ getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics, env.getExternalResourceInfoProvider());
}
// --------------------------------------------------------------------------------------------
@@ -375,8 +377,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
- Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
- super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
+ Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap,
+ MetricGroup metrics, ExternalResourceInfoProvider externalResourceInfoProvider) {
+ super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics, externalResourceInfoProvider);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 75e4740..44e0ad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceMa
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -536,6 +537,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
metricRegistry,
blobCacheService,
useLocalCommunication(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
taskExecutor.start();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index d22d472..a51f376 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1035,7 +1035,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
Environment env = getEnvironment();
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
- getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+ getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics, env.getExternalResourceInfoProvider());
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index f2dc9bc..3c80d8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -415,6 +415,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
- getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
+ getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()), env.getExternalResourceInfoProvider());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index ba1bb08..6b8dd6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -404,8 +404,9 @@ public class DataSourceTask<OT> extends AbstractInvokable {
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
+
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
- getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
+ getEnvironment().getMetricGroup().getOrAddOperator(sourceName), env.getExternalResourceInfoProvider());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 7dd3a14..f2a782e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -80,7 +80,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
} else {
this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
- parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
+ parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics, env.getExternalResourceInfoProvider()
);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index 35c7bab..3c3fcbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.operators.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -33,6 +35,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
import org.apache.flink.runtime.broadcast.InitializationTypeConflictException;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.util.Preconditions;
/**
@@ -41,10 +44,14 @@ import org.apache.flink.util.Preconditions;
public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
-
+
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
public DistributedRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
- Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators, MetricGroup metrics) {
+ Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
+ MetricGroup metrics, ExternalResourceInfoProvider externalResourceInfoProvider) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics);
+ this.externalResourceInfoProvider = Preconditions.checkNotNull(externalResourceInfoProvider);
}
@Override
@@ -87,6 +94,11 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
throw new IllegalArgumentException("The broadcast variable with name '" + name + "' has not been set.");
}
}
+
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
+ }
// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 59d72ff..d74532f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
@@ -202,6 +203,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
/** The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
+ /** Information provider for external resources. */
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
/** The network component in the task manager. */
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
@@ -257,6 +261,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metricQueryServiceAddress,
@@ -278,6 +283,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
this.blobCacheService = checkNotNull(blobCacheService);
this.metricQueryServiceAddress = metricQueryServiceAddress;
this.backPressureSampleService = checkNotNull(backPressureSampleService);
+ this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
@@ -620,6 +626,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
+ externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f4add3b..88786bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -145,6 +147,11 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
configuration, highAvailabilityServices.createBlobStore(), null
);
+ final ExternalResourceInfoProvider externalResourceInfoProvider =
+ ExternalResourceUtils.createStaticExternalResourceInfoProvider(
+ ExternalResourceUtils.getExternalResourceAmountMap(configuration),
+ ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
+
taskManager = startTaskManager(
this.configuration,
this.resourceId,
@@ -154,6 +161,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
metricRegistry,
blobCacheService,
false,
+ externalResourceInfoProvider,
this);
this.terminationFuture = new CompletableFuture<>();
@@ -330,6 +338,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {
checkNotNull(configuration);
@@ -373,6 +382,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
+ externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index cd04bd3..ca79f91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -70,6 +71,7 @@ public class RuntimeEnvironment implements Environment {
private final TaskStateManager taskStateManager;
private final GlobalAggregateManager aggregateManager;
private final InputSplitProvider splitProvider;
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
private final Map<String, Future<Path>> distCacheEntries;
@@ -117,7 +119,8 @@ public class RuntimeEnvironment implements Environment {
TaskOperatorEventGateway operatorEventGateway,
TaskManagerRuntimeInfo taskManagerInfo,
TaskMetricGroup metrics,
- Task containingTask) {
+ Task containingTask,
+ ExternalResourceInfoProvider externalResourceInfoProvider) {
this.jobId = checkNotNull(jobId);
this.jobVertexId = checkNotNull(jobVertexId);
@@ -144,6 +147,7 @@ public class RuntimeEnvironment implements Environment {
this.taskManagerInfo = checkNotNull(taskManagerInfo);
this.containingTask = containingTask;
this.metrics = metrics;
+ this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
}
// ------------------------------------------------------------------------
@@ -269,6 +273,11 @@ public class RuntimeEnvironment implements Environment {
}
@Override
+ public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+ return externalResourceInfoProvider;
+ }
+
+ @Override
public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dad0b68..2e3c9b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -193,6 +194,9 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
private final TaskEventDispatcher taskEventDispatcher;
+ /** Information provider for external resources. */
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
/** The manager for state of operators running in this task/slot. */
private final TaskStateManager taskStateManager;
@@ -294,6 +298,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
KvStateService kvStateService,
BroadcastVariableManager bcVarManager,
TaskEventDispatcher taskEventDispatcher,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
TaskStateManager taskStateManager,
TaskManagerActions taskManagerActions,
InputSplitProvider inputSplitProvider,
@@ -351,6 +356,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
this.operatorCoordinatorEventGateway = Preconditions.checkNotNull(operatorCoordinatorEventGateway);
this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
this.taskManagerActions = checkNotNull(taskManagerActions);
+ this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.classLoaderHandle = Preconditions.checkNotNull(classLoaderHandle);
this.fileCache = Preconditions.checkNotNull(fileCache);
@@ -680,7 +686,8 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
operatorCoordinatorEventGateway,
taskManagerConfig,
metrics,
- this);
+ this,
+ externalResourceInfoProvider);
// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
index 707bd4b..fb51c29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
@@ -18,18 +18,28 @@
package org.apache.flink.runtime.externalresource;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
import org.apache.flink.util.TestLogger;
+import org.apache.commons.collections.IteratorUtils;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -123,4 +133,114 @@ public class ExternalResourceUtilsTest extends TestLogger {
assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1));
assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2));
}
+
+ @Test
+ public void testConstructExternalResourceDriversFromConfig() {
+ final Configuration config = new Configuration();
+ final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName();
+ final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+ plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+ final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+ config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+ assertThat(externalResourceDrivers.size(), is(1));
+ assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class));
+ }
+
+ @Test
+ public void testNotConfiguredFactoryClass() {
+ final Configuration config = new Configuration();
+ final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+ plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+ final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+ assertThat(externalResourceDrivers.entrySet(), is(empty()));
+ }
+
+ @Test
+ public void testFactoryPluginDoesNotExist() {
+ final Configuration config = new Configuration();
+ final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName();
+ final PluginManager testingPluginManager = new TestingPluginManager(Collections.emptyMap());
+
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+ config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+ assertThat(externalResourceDrivers.entrySet(), is(empty()));
+ }
+
+ @Test
+ public void testFactoryFailedToCreateDriver() {
+ final Configuration config = new Configuration();
+ final String driverFactoryClassName = TestingFailedExternalResourceDriverFactory.class.getName();
+ final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+ plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingFailedExternalResourceDriverFactory()));
+ final PluginManager testingPluginManager = new TestingPluginManager(plugins);
+
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+ config.setString(ExternalResourceOptions.getExternalResourceDriverFactoryConfigOptionForResource(RESOURCE_NAME_1), driverFactoryClassName);
+
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManager);
+
+ assertThat(externalResourceDrivers.entrySet(), is(empty()));
+ }
+
+ @Test
+ public void testGetExternalResourceInfoProvider() {
+ final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+ externalResourceAmountMap.put(RESOURCE_NAME_1, RESOURCE_AMOUNT_1);
+ externalResourceDrivers.put(RESOURCE_NAME_1, new TestingExternalResourceDriver());
+
+ final StaticExternalResourceInfoProvider externalResourceInfoProvider =
+ (StaticExternalResourceInfoProvider) ExternalResourceUtils.createStaticExternalResourceInfoProvider(externalResourceAmountMap, externalResourceDrivers);
+
+ assertNotNull(externalResourceInfoProvider.getExternalResources().get(RESOURCE_NAME_1));
+ }
+
+ @Test
+ public void testGetExternalResourceInfoProviderWithoutAmount() {
+ final Map<String, Long> externalResourceAmountMap = new HashMap<>();
+ final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>();
+ externalResourceDrivers.put(RESOURCE_NAME_1, new TestingExternalResourceDriver());
+
+ final StaticExternalResourceInfoProvider externalResourceInfoProvider =
+ (StaticExternalResourceInfoProvider) ExternalResourceUtils.createStaticExternalResourceInfoProvider(externalResourceAmountMap, externalResourceDrivers);
+
+ assertThat(externalResourceInfoProvider.getExternalResources().entrySet(), is(empty()));
+ }
+
+ @Test
+ public void testGetExternalResourceAmountMap() {
+ final Configuration config = new Configuration();
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+ config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), RESOURCE_AMOUNT_1);
+
+ final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config);
+
+ assertThat(externalResourceAmountMap.size(), is(1));
+ assertTrue(externalResourceAmountMap.containsKey(RESOURCE_NAME_1));
+ assertThat(externalResourceAmountMap.get(RESOURCE_NAME_1), is(RESOURCE_AMOUNT_1));
+ }
+
+ @Test
+ public void testGetExternalResourceAmountMapWithIllegalAmount() {
+ final Configuration config = new Configuration();
+ config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1));
+ config.setLong(ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), 0);
+
+ final Map<String, Long> externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config);
+
+ assertThat(externalResourceAmountMap.entrySet(), is(empty()));
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
new file mode 100644
index 0000000..ce6387b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * No-op {@link ExternalResourceDriver} for testing purpose.
+ */
+public class TestingExternalResourceDriver implements ExternalResourceDriver {
+
+ private int callTimes = 0;
+
+ @Override
+ public Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) {
+ callTimes += 1;
+ return Collections.emptySet();
+ }
+
+ public int getCallTimes() {
+ return callTimes;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java
new file mode 100644
index 0000000..d08e614
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriverFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Implementation of {@link ExternalResourceDriverFactory} for testing purpose.
+ */
+public class TestingExternalResourceDriverFactory implements ExternalResourceDriverFactory {
+
+ @Override
+ public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
+ return new TestingExternalResourceDriver();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java
new file mode 100644
index 0000000..449c310
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Implementation of {@link ExternalResourceDriverFactory} for testing purpose which fails to create an {@link ExternalResourceDriver}.
+ */
+public class TestingFailedExternalResourceDriverFactory implements ExternalResourceDriverFactory {
+
+ @Override
+ public ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception {
+ throw new Exception();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index d233d79..15b6d07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -197,6 +198,11 @@ public class DummyEnvironment implements Environment {
}
@Override
+ public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+ return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+ }
+
+ @Override
public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2e85a4f..8f97a5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.RecordCollectingResultPartitionWriter;
@@ -119,6 +120,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
private final TaskMetricGroup taskMetricGroup;
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
+
public static MockEnvironmentBuilder builder() {
return new MockEnvironmentBuilder();
}
@@ -140,7 +143,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
ClassLoader userCodeClassLoader,
TaskMetricGroup taskMetricGroup,
TaskManagerRuntimeInfo taskManagerRuntimeInfo,
- MemoryManager memManager) {
+ MemoryManager memManager,
+ ExternalResourceInfoProvider externalResourceInfoProvider) {
this.jobID = jobID;
this.jobVertexID = jobVertexID;
@@ -169,6 +173,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
this.aggregateManager = Preconditions.checkNotNull(aggregateManager);
this.taskMetricGroup = taskMetricGroup;
+
+ this.externalResourceInfoProvider = Preconditions.checkNotNull(externalResourceInfoProvider);
}
public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -330,6 +336,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
}
@Override
+ public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+ return externalResourceInfoProvider;
+ }
+
+ @Override
public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
throw new UnsupportedOperationException();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index d421015..bedd3b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,6 +55,7 @@ public class MockEnvironmentBuilder {
private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
private IOManager ioManager;
private MemoryManager memoryManager = buildMemoryManager(1024 * MemoryManager.DEFAULT_PAGE_SIZE);
+ private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
private MemoryManager buildMemoryManager(long memorySize) {
return MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, memorySize).build();
@@ -144,6 +146,11 @@ public class MockEnvironmentBuilder {
return this;
}
+ public MockEnvironmentBuilder setExternalResourceInfoProvider(ExternalResourceInfoProvider externalResourceInfoProvider) {
+ this.externalResourceInfoProvider = externalResourceInfoProvider;
+ return this;
+ }
+
public MockEnvironment build() {
if (ioManager == null) {
ioManager = new IOManagerAsync();
@@ -165,6 +172,7 @@ public class MockEnvironmentBuilder {
userCodeClassLoader,
taskMetricGroup,
taskManagerRuntimeInfo,
- memoryManager);
+ memoryManager,
+ externalResourceInfoProvider);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 58bbfcf..9cc5d69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
@@ -458,6 +459,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
InetAddress.getLoopbackAddress().getHostAddress()),
haServices,
taskManagerServices,
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new HeartbeatServices(10_000L, 30_000L),
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 097ba56..b5d0592 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
@@ -189,6 +190,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
.setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1))
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
.build(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new TestingHeartbeatServices(),
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4a23726..5d0e165 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
@@ -1867,6 +1868,7 @@ public class TaskExecutorTest extends TestLogger {
InetAddress.getLoopbackAddress().getHostAddress()),
haServices,
taskManagerServices,
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
@@ -1889,6 +1891,7 @@ public class TaskExecutorTest extends TestLogger {
InetAddress.getLoopbackAddress().getHostAddress()),
haServices,
taskManagerServices,
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 8d5fadf..d0f2241 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -182,6 +183,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
new VoidBlobStore(),
null),
false,
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
error -> {});
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 927622c..824e0f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -220,6 +221,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
InetAddress.getLoopbackAddress().getHostAddress()),
haServices,
taskManagerServices,
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
heartbeatServices,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
metricQueryServiceAddress,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 4556c40..7a4a01f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
@@ -41,6 +42,7 @@ class TestingTaskExecutor extends TaskExecutor {
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metricQueryServiceAddress,
@@ -53,6 +55,7 @@ class TestingTaskExecutor extends TaskExecutor {
taskManagerConfiguration,
haServices,
taskExecutorServices,
+ externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup,
metricQueryServiceAddress,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3858f34..ef77500 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -198,6 +199,7 @@ public class TaskAsyncCallTest extends TestLogger {
new KvStateService(new KvStateRegistry(), null, null),
mock(BroadcastVariableManager.class),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new TestTaskStateManager(),
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
index 3183f01..2decf57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestTaskBuilder.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -85,6 +86,7 @@ public final class TestTaskBuilder {
private JobID jobId = new JobID();
private AllocationID allocationID = new AllocationID();
private ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
+ private ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
public TestTaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) {
this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment);
@@ -170,6 +172,11 @@ public final class TestTaskBuilder {
return this;
}
+ public TestTaskBuilder setExternalResourceInfoProvider(ExternalResourceInfoProvider externalResourceInfoProvider) {
+ this.externalResourceInfoProvider = externalResourceInfoProvider;
+ return this;
+ }
+
public Task build() throws Exception {
final JobVertexID jobVertexId = new JobVertexID();
@@ -209,6 +216,7 @@ public final class TestTaskBuilder {
kvStateService,
new BroadcastVariableManager(),
new TaskEventDispatcher(),
+ externalResourceInfoProvider,
new TestTaskStateManager(),
taskManagerActions,
new MockInputSplitProvider(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 1eaf46b..eadb9d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -208,6 +209,7 @@ public class JvmExitOnFatalErrorTest {
new KvStateService(new KvStateRegistry(), null, null),
new BroadcastVariableManager(),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
slotStateManager,
new NoOpTaskManagerActions(),
new NoOpInputSplitProvider(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 8d9ac8f..7b4ac6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
@@ -51,6 +52,7 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the
@@ -149,6 +151,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
return runtimeContext.getUserCodeClassLoader();
}
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ return runtimeContext.getExternalResourceInfos(resourceName);
+ }
+
// -----------------------------------------------------------------------------------
// Unsupported operations
// -----------------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 8a4d215..a698254 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -212,7 +212,8 @@ public abstract class AbstractStreamOperator<OUT>
getMetricGroup(),
getOperatorID(),
getProcessingTimeService(),
- null);
+ null,
+ environment.getExternalResourceInfoProvider());
stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 9f1fa89..56533db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -143,7 +143,8 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU
operatorMetricGroup,
getOperatorID(),
processingTimeService,
- null);
+ null,
+ environment.getExternalResourceInfoProvider());
}
private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 65b54a7..e451563 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
import org.apache.flink.api.common.state.AggregatingState;
@@ -39,6 +40,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
@@ -51,6 +53,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,6 +70,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
private final String operatorUniqueID;
private final ProcessingTimeService processingTimeService;
private @Nullable KeyedStateStore keyedStateStore;
+ private final ExternalResourceInfoProvider externalResourceInfoProvider;
@VisibleForTesting
public StreamingRuntimeContext(
@@ -79,7 +83,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
operator.getMetricGroup(),
operator.getOperatorID(),
operator.getProcessingTimeService(),
- operator.getKeyedStateStore());
+ operator.getKeyedStateStore(),
+ env.getExternalResourceInfoProvider());
}
public StreamingRuntimeContext(
@@ -88,7 +93,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
MetricGroup operatorMetricGroup,
OperatorID operatorID,
ProcessingTimeService processingTimeService,
- @Nullable KeyedStateStore keyedStateStore) {
+ @Nullable KeyedStateStore keyedStateStore,
+ ExternalResourceInfoProvider externalResourceInfoProvider) {
super(checkNotNull(env).getTaskInfo(),
env.getUserClassLoader(),
env.getExecutionConfig(),
@@ -100,6 +106,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
this.operatorUniqueID = checkNotNull(operatorID).toString();
this.processingTimeService = processingTimeService;
this.keyedStateStore = keyedStateStore;
+ this.externalResourceInfoProvider = externalResourceInfoProvider;
}
public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) {
@@ -150,6 +157,11 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
return taskEnvironment.getTaskManagerInfo();
}
+ @Override
+ public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
+ return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
+ }
+
// ------------------------------------------------------------------------
// broadcast variables
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 1c2390c..fff4ef3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -290,7 +291,8 @@ public class StreamingRuntimeContextTest {
operator.getMetricGroup(),
operator.getOperatorID(),
operator.getProcessingTimeService(),
- operator.getKeyedStateStore());
+ operator.getKeyedStateStore(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
}
@SuppressWarnings("unchecked")
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 35a4936..d700920 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -269,6 +270,7 @@ public class InterruptSensitiveRestoreTest {
new KvStateService(new KvStateRegistry(), null, null),
mock(BroadcastVariableManager.class),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
taskStateManager,
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0092f02..01770f8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -320,6 +321,11 @@ public class StreamMockEnvironment implements Environment {
}
@Override
+ public ExternalResourceInfoProvider getExternalResourceInfoProvider() {
+ return ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+ }
+
+ @Override
public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index cda5d02..4d9690c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -172,6 +173,7 @@ public class StreamTaskTerminationTest extends TestLogger {
new KvStateService(new KvStateRegistry(), null, null),
mock(BroadcastVariableManager.class),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new TestTaskStateManager(),
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 05eec10..ca5ee61 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -247,6 +248,7 @@ public class SynchronousCheckpointITCase {
new KvStateService(new KvStateRegistry(), null, null),
mock(BroadcastVariableManager.class),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new TestTaskStateManager(),
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 9619bea..4d8f3a7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -213,6 +214,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
new KvStateService(new KvStateRegistry(), null, null),
mock(BroadcastVariableManager.class),
new TaskEventDispatcher(),
+ ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
new TestTaskStateManager(),
mock(TaskManagerActions.class),
mock(InputSplitProvider.class),