You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2023/01/20 10:45:06 UTC

[flink] branch master updated: [FLINK-30704][filesystems][s3] Add S3 delegation token support

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0141f13ca80 [FLINK-30704][filesystems][s3] Add S3 delegation token support
0141f13ca80 is described below

commit 0141f13ca801d5db45435d101a9c3ef83889bbc0
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Fri Jan 6 12:20:42 2023 +0100

    [FLINK-30704][filesystems][s3] Add S3 delegation token support
---
 .../flink/core/plugin/DefaultPluginManager.java    |  42 +++++++-
 .../security/token/DelegationTokenProvider.java    |   8 +-
 .../security/token/DelegationTokenReceiver.java    |   2 +-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |   2 +
 .../DynamicTemporaryAWSCredentialsProvider.java    |  65 ++++++++++++
 .../s3/common/token/S3DelegationTokenProvider.java | 112 +++++++++++++++++++++
 .../s3/common/token/S3DelegationTokenReceiver.java | 106 +++++++++++++++++++
 ...ink.core.security.token.DelegationTokenProvider |   3 +-
 ...ink.core.security.token.DelegationTokenReceiver |   3 +-
 ...DynamicTemporaryAWSCredentialsProviderTest.java |  73 ++++++++++++++
 .../token/S3DelegationTokenProviderTest.java       |  55 ++++++++++
 .../token/S3DelegationTokenReceiverTest.java       | 105 +++++++++++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java      |   5 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   8 +-
 .../token/DefaultDelegationTokenManager.java       |  74 ++++++++------
 .../DefaultDelegationTokenManagerFactory.java      |   5 +-
 .../token/DelegationTokenReceiverRepository.java   |  72 +++++++------
 .../token/hadoop/HBaseDelegationTokenProvider.java |   2 +-
 .../hadoop/HadoopDelegationTokenReceiver.java      |   2 +-
 .../hadoop/HadoopFSDelegationTokenProvider.java    |   2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |   2 +-
 ...nk.core.security.token.DelegationTokenProvider} |   0
 ...nk.core.security.token.DelegationTokenReceiver} |   0
 .../token/DefaultDelegationTokenManagerTest.java   |  16 +--
 .../DelegationTokenReceiverRepositoryTest.java     |   8 +-
 .../ExceptionThrowingDelegationTokenProvider.java  |   1 +
 .../ExceptionThrowingDelegationTokenReceiver.java  |   1 +
 .../token/TestDelegationTokenProvider.java         |   1 +
 .../token/TestDelegationTokenReceiver.java         |   1 +
 .../runtime/taskexecutor/TaskExecutorBuilder.java  |   2 +-
 ...cutorExecutionDeploymentReconciliationTest.java |   2 +-
 .../TaskExecutorPartitionLifecycleTest.java        |   2 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |   2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   4 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |   2 +-
 .../TaskSubmissionTestEnvironment.java             |   2 +-
 ...nk.core.security.token.DelegationTokenProvider} |   0
 ...nk.core.security.token.DelegationTokenReceiver} |   0
 .../test/plugin/DefaultPluginManagerTest.java      |  43 ++++++--
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   2 +-
 40 files changed, 730 insertions(+), 107 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java b/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
index be8825bd0ca..07a3a533900 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
@@ -21,20 +21,31 @@ package org.apache.flink.core.plugin;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /** Default implementation of {@link PluginManager}. */
 @Internal
 @ThreadSafe
 public class DefaultPluginManager implements PluginManager {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultPluginManager.class);
+
     /**
      * Parent-classloader to all classloader that are used for plugin loading. We expect that this
      * is thread-safe.
@@ -44,6 +55,11 @@ public class DefaultPluginManager implements PluginManager {
     /** A collection of descriptions of all plugins known to this plugin manager. */
     private final Collection<PluginDescriptor> pluginDescriptors;
 
+    private final Lock pluginLoadersLock;
+
+    @GuardedBy("pluginLoadersLock")
+    private final Map<String, PluginLoader> pluginLoaders;
+
     /** List of patterns for classes that should always be resolved from the parent ClassLoader. */
     private final String[] alwaysParentFirstPatterns;
 
@@ -51,6 +67,8 @@ public class DefaultPluginManager implements PluginManager {
     DefaultPluginManager() {
         parentClassLoader = null;
         pluginDescriptors = null;
+        pluginLoadersLock = null;
+        pluginLoaders = null;
         alwaysParentFirstPatterns = null;
     }
 
@@ -67,6 +85,8 @@ public class DefaultPluginManager implements PluginManager {
             ClassLoader parentClassLoader,
             String[] alwaysParentFirstPatterns) {
         this.pluginDescriptors = pluginDescriptors;
+        this.pluginLoadersLock = new ReentrantLock();
+        this.pluginLoaders = new HashMap<>();
         this.parentClassLoader = parentClassLoader;
         this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
     }
@@ -75,9 +95,23 @@ public class DefaultPluginManager implements PluginManager {
     public <P> Iterator<P> load(Class<P> service) {
         ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size());
         for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
-            PluginLoader pluginLoader =
-                    PluginLoader.create(
-                            pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
+            PluginLoader pluginLoader;
+            String pluginId = pluginDescriptor.getPluginId();
+            pluginLoadersLock.lock();
+            try {
+                if (pluginLoaders.containsKey(pluginId)) {
+                    LOG.info("Plugin loader with ID found, reusing it: {}", pluginId);
+                    pluginLoader = pluginLoaders.get(pluginId);
+                } else {
+                    LOG.info("Plugin loader with ID not found, creating it: {}", pluginId);
+                    pluginLoader =
+                            PluginLoader.create(
+                                    pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
+                    pluginLoaders.putIfAbsent(pluginId, pluginLoader);
+                }
+            } finally {
+                pluginLoadersLock.unlock();
+            }
             combinedIterators.add(pluginLoader.load(service));
         }
         return Iterators.concat(combinedIterators.iterator());
@@ -90,6 +124,8 @@ public class DefaultPluginManager implements PluginManager {
                 + parentClassLoader
                 + ", pluginDescriptors="
                 + pluginDescriptors
+                + ", pluginLoaders="
+                + Joiner.on(",").withKeyValueSeparator("=").join(pluginLoaders)
                 + ", alwaysParentFirstPatterns="
                 + Arrays.toString(alwaysParentFirstPatterns)
                 + '}';
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
similarity index 91%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
rename to flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
index c6d7dcea413..676148564b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++ b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.core.security.token;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.Configuration;
 import java.util.Optional;
 
 /**
- * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by {@link
- * DelegationTokenManager} through service loader. Basically the implementation of this interface is
+ * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by
+ * DelegationTokenManager through service loader. Basically the implementation of this interface is
  * responsible to produce the serialized form of tokens which will be handled by {@link
  * DelegationTokenReceiver} instances both on JobManager and TaskManager side.
  */
@@ -69,7 +69,7 @@ public interface DelegationTokenProvider {
     }
 
     /**
-     * Called by {@link DelegationTokenManager} to initialize provider after construction.
+     * Called by DelegationTokenManager to initialize provider after construction.
      *
      * @param configuration Configuration to initialize the provider.
      */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
similarity index 97%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
rename to flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
index b296186edc3..78918100427 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
+++ b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.core.security.token;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 024abbdc12d..0dac58be3bf 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.Preconditions;
@@ -123,6 +124,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
             // create the Hadoop FileSystem
             org.apache.hadoop.conf.Configuration hadoopConfig =
                     hadoopConfigLoader.getOrLoadHadoopConfig();
+            S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
             org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
             fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
new file mode 100644
index 00000000000..8760f1aa3ee
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Support dynamic session credentials for authenticating with AWS. Please note that users may
+ * reference this class name from configuration property fs.s3a.aws.credentials.provider. Therefore,
+ * changing the class name would be a backward-incompatible change. This credential provider must
+ * not fail in creation because that will break a chain of credential providers.
+ */
+@Internal
+public class DynamicTemporaryAWSCredentialsProvider implements AWSCredentialsProvider {
+
+    public static final String NAME = DynamicTemporaryAWSCredentialsProvider.class.getName();
+
+    public static final String COMPONENT = "Dynamic session credentials for Flink";
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicTemporaryAWSCredentialsProvider.class);
+
+    @Override
+    public AWSCredentials getCredentials() throws SdkBaseException {
+        Credentials credentials = S3DelegationTokenReceiver.getCredentials();
+        if (credentials == null) {
+            throw new NoAwsCredentialsException(COMPONENT);
+        }
+        LOG.debug("Providing session credentials");
+        return new BasicSessionCredentials(
+                credentials.getAccessKeyId(),
+                credentials.getSecretAccessKey(),
+                credentials.getSessionToken());
+    }
+
+    @Override
+    public void refresh() {
+        // Intentionally blank. Credentials are updated by S3DelegationTokenReceiver
+    }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
new file mode 100644
index 00000000000..4c04b975908
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Delegation token provider for S3 filesystems. */
+@Internal
+public class S3DelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenProvider.class);
+
+    private String region;
+    private String accessKey;
+    private String secretKey;
+
+    @Override
+    public String serviceName() {
+        return "s3";
+    }
+
+    @Override
+    public void init(Configuration configuration) {
+        region = configuration.getString(String.format("%s.region", serviceConfigPrefix()), null);
+        if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+            LOG.debug("Region: " + region);
+        }
+
+        accessKey =
+                configuration.getString(
+                        String.format("%s.access-key", serviceConfigPrefix()), null);
+        if (!StringUtils.isNullOrWhitespaceOnly(accessKey)) {
+            LOG.debug("Access key: " + accessKey);
+        }
+
+        secretKey =
+                configuration.getString(
+                        String.format("%s.secret-key", serviceConfigPrefix()), null);
+        if (!StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+            LOG.debug(
+                    "Secret key: "
+                            + GlobalConfiguration.HIDDEN_CONTENT
+                            + " (sensitive information)");
+        }
+    }
+
+    @Override
+    public boolean delegationTokensRequired() {
+        if (StringUtils.isNullOrWhitespaceOnly(region)
+                || StringUtils.isNullOrWhitespaceOnly(accessKey)
+                || StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+            LOG.debug("Not obtaining session credentials because not all configurations are set");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+        LOG.info("Obtaining session credentials token with access key: {}", accessKey);
+
+        AWSSecurityTokenService stsClient =
+                AWSSecurityTokenServiceClientBuilder.standard()
+                        .withRegion(region)
+                        .withCredentials(
+                                new AWSStaticCredentialsProvider(
+                                        new BasicAWSCredentials(accessKey, secretKey)))
+                        .build();
+        GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
+        Credentials credentials = sessionTokenResult.getCredentials();
+        LOG.info(
+                "Session credentials obtained successfully with access key: {} expiration: {}",
+                credentials.getAccessKeyId(),
+                credentials.getExpiration());
+
+        return new ObtainedDelegationTokens(
+                InstantiationUtil.serializeObject(credentials),
+                Optional.of(credentials.getExpiration().getTime()));
+    }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
new file mode 100644
index 00000000000..55dd18d3c4b
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** Delegation token receiver for S3 filesystems. */
+@Internal
+public class S3DelegationTokenReceiver implements DelegationTokenReceiver {
+
+    public static final String PROVIDER_CONFIG_NAME = "fs.s3a.aws.credentials.provider";
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
+
+    @VisibleForTesting @Nullable static volatile Credentials credentials;
+
+    @VisibleForTesting @Nullable static volatile String region;
+
+    public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) {
+        LOG.info("Updating Hadoop configuration");
+
+        String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
+        if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
+            if (providers.isEmpty()) {
+                LOG.debug("Setting provider");
+                providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+            } else {
+                providers = DynamicTemporaryAWSCredentialsProvider.NAME + "," + providers;
+                LOG.debug("Prepending provider, new providers value: {}", providers);
+            }
+            hadoopConfig.set(PROVIDER_CONFIG_NAME, providers);
+        } else {
+            LOG.debug("Provider already exists");
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+            LOG.debug("Setting region");
+            hadoopConfig.set("fs.s3a.endpoint.region", region);
+        }
+
+        LOG.info("Updated Hadoop configuration successfully");
+    }
+
+    @Override
+    public String serviceName() {
+        return "s3";
+    }
+
+    @Override
+    public void init(Configuration configuration) {
+        region =
+                configuration.getString(
+                        String.format(
+                                "%s.%s.region",
+                                DelegationTokenProvider.CONFIG_PREFIX, serviceName()),
+                        null);
+        if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+            LOG.debug("Region: " + region);
+        }
+    }
+
+    @Override
+    public void onNewTokensObtained(byte[] tokens) throws Exception {
+        LOG.info("Updating session credentials");
+        credentials =
+                InstantiationUtil.deserializeObject(
+                        tokens, S3DelegationTokenReceiver.class.getClassLoader());
+        LOG.info(
+                "Session credentials updated successfully with access key: {} expiration: {}",
+                credentials.getAccessKeyId(),
+                credentials.getExpiration());
+    }
+
+    @Nullable
+    public static Credentials getCredentials() {
+        return credentials;
+    }
+}
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 83%
copy from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
copy to flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index bd3ed1d6482..d2e298d1a28 100644
--- a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.TestDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider
+org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 83%
copy from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
copy to flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index bd3ed1d6482..59ba43f7f2a 100644
--- a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.TestDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider
+org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
new file mode 100644
index 00000000000..a6b926f8737
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
+import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link DynamicTemporaryAWSCredentialsProvider}. */
+public class DynamicTemporaryAWSCredentialsProviderTest {
+
+    private static final String ACCESS_KEY_ID = "testAccessKeyId";
+    private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
+    private static final String SESSION_TOKEN = "testSessionToken";
+
+    @BeforeEach
+    void beforeEach() {
+        S3DelegationTokenReceiver.credentials = null;
+    }
+
+    @AfterEach
+    void afterEach() {
+        S3DelegationTokenReceiver.credentials = null;
+    }
+
+    @Test
+    public void getCredentialsShouldThrowExceptionWhenNoCredentials() {
+        DynamicTemporaryAWSCredentialsProvider provider =
+                new DynamicTemporaryAWSCredentialsProvider();
+
+        assertThrows(NoAwsCredentialsException.class, provider::getCredentials);
+    }
+
+    @Test
+    public void getCredentialsShouldStoreCredentialsWhenCredentialsProvided() throws Exception {
+        DynamicTemporaryAWSCredentialsProvider provider =
+                new DynamicTemporaryAWSCredentialsProvider();
+        Credentials credentials =
+                new Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, SESSION_TOKEN, null);
+        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+
+        receiver.onNewTokensObtained(InstantiationUtil.serializeObject(credentials));
+        BasicSessionCredentials returnedCredentials =
+                (BasicSessionCredentials) provider.getCredentials();
+        assertEquals(returnedCredentials.getAWSAccessKeyId(), credentials.getAccessKeyId());
+        assertEquals(returnedCredentials.getAWSSecretKey(), credentials.getSecretAccessKey());
+        assertEquals(returnedCredentials.getSessionToken(), credentials.getSessionToken());
+    }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
new file mode 100644
index 00000000000..d0109cbc44d
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link S3DelegationTokenProvider}. */
+public class S3DelegationTokenProviderTest {
+
+    private static final String REGION = "testRegion";
+    private static final String ACCESS_KEY_ID = "testAccessKeyId";
+    private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
+
+    @Test
+    public void delegationTokensRequiredShouldReturnFalseWithoutCredentials() {
+        S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
+        provider.init(new Configuration());
+
+        assertFalse(provider.delegationTokensRequired());
+    }
+
+    @Test
+    public void delegationTokensRequiredShouldReturnTrueWithCredentials() {
+        S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
+        Configuration configuration = new Configuration();
+        configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
+        configuration.setString(CONFIG_PREFIX + ".s3.access-key", ACCESS_KEY_ID);
+        configuration.setString(CONFIG_PREFIX + ".s3.secret-key", SECRET_ACCESS_KEY);
+        provider.init(configuration);
+
+        assertTrue(provider.delegationTokensRequired());
+    }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
new file mode 100644
index 00000000000..ab817b0a532
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Tests for {@link S3DelegationTokenReceiver}. */
+public class S3DelegationTokenReceiverTest {
+
+    private static final String PROVIDER_CLASS_NAME = "TestProvider";
+    private static final String REGION = "testRegion";
+
+    @BeforeEach
+    void beforeEach() {
+        S3DelegationTokenReceiver.region = null;
+    }
+
+    @AfterEach
+    void afterEach() {
+        S3DelegationTokenReceiver.region = null;
+    }
+
+    @Test
+    public void updateHadoopConfigShouldSetProviderWhenEmpty() {
+        org.apache.hadoop.conf.Configuration hadoopConfiguration =
+                new org.apache.hadoop.conf.Configuration();
+        hadoopConfiguration.set(PROVIDER_CONFIG_NAME, "");
+        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        assertEquals(
+                DynamicTemporaryAWSCredentialsProvider.NAME,
+                hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
+    }
+
+    @Test
+    public void updateHadoopConfigShouldPrependProviderWhenNotEmpty() {
+        org.apache.hadoop.conf.Configuration hadoopConfiguration =
+                new org.apache.hadoop.conf.Configuration();
+        hadoopConfiguration.set(PROVIDER_CONFIG_NAME, PROVIDER_CLASS_NAME);
+        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        String[] providers = hadoopConfiguration.get(PROVIDER_CONFIG_NAME).split(",");
+        assertEquals(2, providers.length);
+        assertEquals(DynamicTemporaryAWSCredentialsProvider.NAME, providers[0]);
+        assertEquals(PROVIDER_CLASS_NAME, providers[1]);
+    }
+
+    @Test
+    public void updateHadoopConfigShouldNotAddProviderWhenAlreadyExists() {
+        org.apache.hadoop.conf.Configuration hadoopConfiguration =
+                new org.apache.hadoop.conf.Configuration();
+        hadoopConfiguration.set(PROVIDER_CONFIG_NAME, DynamicTemporaryAWSCredentialsProvider.NAME);
+        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        assertEquals(
+                DynamicTemporaryAWSCredentialsProvider.NAME,
+                hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
+    }
+
+    @Test
+    public void updateHadoopConfigShouldNotUpdateRegionWhenNotConfigured() {
+        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+        receiver.init(new Configuration());
+
+        org.apache.hadoop.conf.Configuration hadoopConfiguration =
+                new org.apache.hadoop.conf.Configuration();
+        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        assertNull(hadoopConfiguration.get("fs.s3a.endpoint.region"));
+    }
+
+    @Test
+    public void updateHadoopConfigShouldUpdateRegionWhenConfigured() {
+        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+        Configuration configuration = new Configuration();
+        configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
+        receiver.init(configuration);
+
+        org.apache.hadoop.conf.Configuration hadoopConfiguration =
+                new org.apache.hadoop.conf.Configuration();
+        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        assertEquals(REGION, hadoopConfiguration.get("fs.s3a.endpoint.region"));
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 92c1a5ff79d..094a8c85a2b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -390,7 +390,10 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
             heartbeatServices = createHeartbeatServices(configuration);
             delegationTokenManager =
                     DefaultDelegationTokenManagerFactory.create(
-                            configuration, commonRpcService.getScheduledExecutor(), ioExecutor);
+                            configuration,
+                            pluginManager,
+                            commonRpcService.getScheduledExecutor(),
+                            ioExecutor);
             metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
 
             final RpcService metricQueryServiceRpcService =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1d48126e141..ad708d96605 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -433,10 +433,14 @@ public class MiniCluster implements AutoCloseableAsync {
 
                 delegationTokenManager =
                         DefaultDelegationTokenManagerFactory.create(
-                                configuration, commonRpcService.getScheduledExecutor(), ioExecutor);
+                                configuration,
+                                miniClusterConfiguration.getPluginManager(),
+                                commonRpcService.getScheduledExecutor(),
+                                ioExecutor);
 
                 delegationTokenReceiverRepository =
-                        new DelegationTokenReceiverRepository(configuration);
+                        new DelegationTokenReceiverRepository(
+                                configuration, miniClusterConfiguration.getPluginManager());
 
                 blobCacheService =
                         BlobUtils.createBlobCacheService(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index fb5a195d2cc..b59abbe59cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.security.token;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -41,10 +44,12 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
 import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -70,6 +75,8 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
     private final Configuration configuration;
 
+    @Nullable private final PluginManager pluginManager;
+
     private final double tokensRenewalTimeRatio;
 
     private final long renewalRetryBackoffPeriod;
@@ -92,15 +99,17 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
     public DefaultDelegationTokenManager(
             Configuration configuration,
+            @Nullable PluginManager pluginManager,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor) {
         this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+        this.pluginManager = pluginManager;
         this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
         this.renewalRetryBackoffPeriod =
                 configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
         this.delegationTokenProviders = loadProviders();
         this.delegationTokenReceiverRepository =
-                new DelegationTokenReceiverRepository(configuration);
+                new DelegationTokenReceiverRepository(configuration, pluginManager);
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
         checkProviderAndReceiverConsistency(
@@ -111,36 +120,39 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
     private Map<String, DelegationTokenProvider> loadProviders() {
         LOG.info("Loading delegation token providers");
 
-        ServiceLoader<DelegationTokenProvider> serviceLoader =
-                ServiceLoader.load(DelegationTokenProvider.class);
-
         Map<String, DelegationTokenProvider> providers = new HashMap<>();
-        for (DelegationTokenProvider provider : serviceLoader) {
-            try {
-                if (isProviderEnabled(configuration, provider.serviceName())) {
-                    provider.init(configuration);
-                    LOG.info(
-                            "Delegation token provider {} loaded and initialized",
-                            provider.serviceName());
-                    checkState(
-                            !providers.containsKey(provider.serviceName()),
-                            "Delegation token provider with service name {} has multiple implementations",
-                            provider.serviceName());
-                    providers.put(provider.serviceName(), provider);
-                } else {
-                    LOG.info(
-                            "Delegation token provider {} is disabled so not loaded",
-                            provider.serviceName());
-                }
-            } catch (Exception | NoClassDefFoundError e) {
-                // The intentional general rule is that if a provider's init method throws exception
-                // then stop the workload
-                LOG.error(
-                        "Failed to initialize delegation token provider {}",
-                        provider.serviceName(),
-                        e);
-                throw new FlinkRuntimeException(e);
-            }
+        Consumer<DelegationTokenProvider> loadProvider =
+                (provider) -> {
+                    try {
+                        if (isProviderEnabled(configuration, provider.serviceName())) {
+                            provider.init(configuration);
+                            LOG.info(
+                                    "Delegation token provider {} loaded and initialized",
+                                    provider.serviceName());
+                            checkState(
+                                    !providers.containsKey(provider.serviceName()),
+                                    "Delegation token provider with service name {} has multiple implementations",
+                                    provider.serviceName());
+                            providers.put(provider.serviceName(), provider);
+                        } else {
+                            LOG.info(
+                                    "Delegation token provider {} is disabled so not loaded",
+                                    provider.serviceName());
+                        }
+                    } catch (Exception | NoClassDefFoundError e) {
+                        // The intentional general rule is that if a provider's init method throws
+                        // exception
+                        // then stop the workload
+                        LOG.error(
+                                "Failed to initialize delegation token provider {}",
+                                provider.serviceName(),
+                                e);
+                        throw new FlinkRuntimeException(e);
+                    }
+                };
+        ServiceLoader.load(DelegationTokenProvider.class).iterator().forEachRemaining(loadProvider);
+        if (pluginManager != null) {
+            pluginManager.load(DelegationTokenProvider.class).forEachRemaining(loadProvider);
         }
 
         LOG.info("Delegation token providers loaded successfully");
@@ -150,7 +162,7 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
     static boolean isProviderEnabled(Configuration configuration, String serviceName) {
         return configuration.getBoolean(
-                String.format("security.delegation.token.provider.%s.enabled", serviceName), true);
+                String.format("%s.%s.enabled", CONFIG_PREFIX, serviceName), true);
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
index 025b535bd67..fca48273516 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.slf4j.Logger;
@@ -40,12 +41,14 @@ public class DefaultDelegationTokenManagerFactory {
 
     public static DelegationTokenManager create(
             Configuration configuration,
+            @Nullable PluginManager pluginManager,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor)
             throws IOException {
 
         if (configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
-            return new DefaultDelegationTokenManager(configuration, scheduledExecutor, ioExecutor);
+            return new DefaultDelegationTokenManager(
+                    configuration, pluginManager, scheduledExecutor, ioExecutor);
         } else {
             return new NoOpDelegationTokenManager();
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
index 8e63c3b4390..6d8f6710edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.security.token;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,46 +49,53 @@ public class DelegationTokenReceiverRepository {
 
     private final Configuration configuration;
 
+    @Nullable private final PluginManager pluginManager;
+
     @VisibleForTesting final Map<String, DelegationTokenReceiver> delegationTokenReceivers;
 
-    public DelegationTokenReceiverRepository(Configuration configuration) {
+    public DelegationTokenReceiverRepository(
+            Configuration configuration, @Nullable PluginManager pluginManager) {
         this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+        this.pluginManager = pluginManager;
         this.delegationTokenReceivers = loadReceivers();
     }
 
     private Map<String, DelegationTokenReceiver> loadReceivers() {
         LOG.info("Loading delegation token receivers");
 
-        ServiceLoader<DelegationTokenReceiver> serviceLoader =
-                ServiceLoader.load(DelegationTokenReceiver.class);
-
         Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
-        for (DelegationTokenReceiver receiver : serviceLoader) {
-            try {
-                if (isProviderEnabled(configuration, receiver.serviceName())) {
-                    receiver.init(configuration);
-                    LOG.info(
-                            "Delegation token receiver {} loaded and initialized",
-                            receiver.serviceName());
-                    checkState(
-                            !receivers.containsKey(receiver.serviceName()),
-                            "Delegation token receiver with service name {} has multiple implementations",
-                            receiver.serviceName());
-                    receivers.put(receiver.serviceName(), receiver);
-                } else {
-                    LOG.info(
-                            "Delegation token receiver {} is disabled so not loaded",
-                            receiver.serviceName());
-                }
-            } catch (Exception | NoClassDefFoundError e) {
-                // The intentional general rule is that if a receiver's init method throws exception
-                // then stop the workload
-                LOG.error(
-                        "Failed to initialize delegation token receiver {}",
-                        receiver.serviceName(),
-                        e);
-                throw new FlinkRuntimeException(e);
-            }
+        Consumer<DelegationTokenReceiver> loadReceiver =
+                (receiver) -> {
+                    try {
+                        if (isProviderEnabled(configuration, receiver.serviceName())) {
+                            receiver.init(configuration);
+                            LOG.info(
+                                    "Delegation token receiver {} loaded and initialized",
+                                    receiver.serviceName());
+                            checkState(
+                                    !receivers.containsKey(receiver.serviceName()),
+                                    "Delegation token receiver with service name {} has multiple implementations",
+                                    receiver.serviceName());
+                            receivers.put(receiver.serviceName(), receiver);
+                        } else {
+                            LOG.info(
+                                    "Delegation token receiver {} is disabled so not loaded",
+                                    receiver.serviceName());
+                        }
+                    } catch (Exception | NoClassDefFoundError e) {
+                        // The intentional general rule is that if a receiver's init method throws
+                        // exception
+                        // then stop the workload
+                        LOG.error(
+                                "Failed to initialize delegation token receiver {}",
+                                receiver.serviceName(),
+                                e);
+                        throw new FlinkRuntimeException(e);
+                    }
+                };
+        ServiceLoader.load(DelegationTokenReceiver.class).iterator().forEachRemaining(loadReceiver);
+        if (pluginManager != null) {
+            pluginManager.load(DelegationTokenReceiver.class).forEachRemaining(loadReceiver);
         }
 
         LOG.info("Delegation token receivers loaded successfully");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 7e8dd66ca54..cdabe8a9d12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
index 0b1cba0f738..f5d912b2b85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenReceiver;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 23202d27df9..960a80509d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a7207e1fd8c..0bbd2d9ee6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -243,7 +243,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
                             configuration, pluginManager);
 
             final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
-                    new DelegationTokenReceiverRepository(configuration);
+                    new DelegationTokenReceiverRepository(configuration, pluginManager);
 
             taskExecutorService =
                     taskExecutorServiceFactory.createTaskExecutor(
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 100%
rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 100%
rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index ed34ea978de..e211389573b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
@@ -35,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Instant.ofEpochMilli;
 import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
-import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,7 +75,8 @@ public class DefaultDelegationTokenManagerTest {
 
     @Test
     public void configurationIsNullMustFailFast() {
-        assertThrows(Exception.class, () -> new DefaultDelegationTokenManager(null, null, null));
+        assertThrows(
+                Exception.class, () -> new DefaultDelegationTokenManager(null, null, null, null));
     }
 
     @Test
@@ -82,7 +85,7 @@ public class DefaultDelegationTokenManagerTest {
                 Exception.class,
                 () -> {
                     ExceptionThrowingDelegationTokenProvider.throwInInit = true;
-                    new DefaultDelegationTokenManager(new Configuration(), null, null);
+                    new DefaultDelegationTokenManager(new Configuration(), null, null, null);
                 });
     }
 
@@ -91,7 +94,7 @@ public class DefaultDelegationTokenManagerTest {
         Configuration configuration = new Configuration();
         configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
         DefaultDelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(configuration, null, null);
+                new DefaultDelegationTokenManager(configuration, null, null, null);
 
         assertEquals(3, delegationTokenManager.delegationTokenProviders.size());
 
@@ -171,7 +174,8 @@ public class DefaultDelegationTokenManagerTest {
         configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
         AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
         DefaultDelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(configuration, scheduledExecutor, scheduler) {
+                new DefaultDelegationTokenManager(
+                        configuration, null, scheduledExecutor, scheduler) {
                     @Override
                     void startTokensUpdate() {
                         startTokensUpdateCallCount.incrementAndGet();
@@ -197,7 +201,7 @@ public class DefaultDelegationTokenManagerTest {
         configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
         configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5);
         DefaultDelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(configuration, null, null);
+                new DefaultDelegationTokenManager(configuration, null, null, null);
 
         Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault());
         assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
index db93128d7ab..b1605226bd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -24,7 +24,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,7 +45,7 @@ class DelegationTokenReceiverRepositoryTest {
 
     @Test
     public void configurationIsNullMustFailFast() {
-        assertThrows(Exception.class, () -> new DelegationTokenReceiverRepository(null));
+        assertThrows(Exception.class, () -> new DelegationTokenReceiverRepository(null, null));
     }
 
     @Test
@@ -54,7 +54,7 @@ class DelegationTokenReceiverRepositoryTest {
                 Exception.class,
                 () -> {
                     ExceptionThrowingDelegationTokenReceiver.throwInInit = true;
-                    new DelegationTokenReceiverRepository(new Configuration());
+                    new DelegationTokenReceiverRepository(new Configuration(), null);
                 });
     }
 
@@ -63,7 +63,7 @@ class DelegationTokenReceiverRepositoryTest {
         Configuration configuration = new Configuration();
         configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
         DelegationTokenReceiverRepository delegationTokenReceiverRepository =
-                new DelegationTokenReceiverRepository(configuration);
+                new DelegationTokenReceiverRepository(configuration, null);
 
         assertEquals(3, delegationTokenReceiverRepository.delegationTokenReceivers.size());
         assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
index 05b2875a8d1..93a78420056 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
 
 import java.util.Optional;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index 41e72c11cce..f1d919f34dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 
 /**
  * An example implementation of {@link DelegationTokenReceiver} which throws exception when enabled.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
index a5cf75d3dc4..384bf0881bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
 
 /** An example implementation of {@link DelegationTokenProvider} which does nothing. */
 public class TestDelegationTokenProvider implements DelegationTokenProvider {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
index 405d026a3a1..5a6c4b5e025 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
 
 /** An example implementation of {@link DelegationTokenReceiver} which does nothing. */
 public class TestDelegationTokenReceiver implements DelegationTokenReceiver {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
index 362b4de5957..7ef93ab4a3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -145,7 +145,7 @@ public class TaskExecutorBuilder {
         }
 
         final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
-                new DelegationTokenReceiverRepository(configuration);
+                new DelegationTokenReceiverRepository(configuration, null);
 
         return new TaskExecutor(
                 rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 6933d29b551..caf60130b5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -253,7 +253,7 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandlerResource.getFatalErrorHandler(),
                 new TestingTaskExecutorPartitionTracker(),
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index e2afdfb703a..9d31376451c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -601,7 +601,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
                 NoOpTaskExecutorBlobService.INSTANCE,
                 new TestingFatalErrorHandler(),
                 partitionTracker,
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private static TaskSlotTable<Task> createTaskSlotTable() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 55cb4f7c6b5..9a1d15f0fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -238,7 +238,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandlerResource.getFatalErrorHandler(),
                 new TestingTaskExecutorPartitionTracker(),
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6d3a885ca43..c13e19a121c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2814,7 +2814,7 @@ public class TaskExecutorTest extends TestLogger {
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandler,
                 taskExecutorPartitionTracker,
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices)
@@ -2850,7 +2850,7 @@ public class TaskExecutorTest extends TestLogger {
                 NoOpTaskExecutorBlobService.INSTANCE,
                 testingFatalErrorHandler,
                 new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 79ff1dc541d..d2e113a4057 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -294,6 +294,6 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
                 ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                 workingDirectory,
                 error -> {},
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 70f42962e51..ebfd25ac3e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -270,7 +270,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
                 taskExecutorBlobService,
                 testingFatalErrorHandler,
                 new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
-                new DelegationTokenReceiverRepository(configuration));
+                new DelegationTokenReceiverRepository(configuration, null));
     }
 
     private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 100%
rename from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
rename to flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 100%
rename from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
rename to flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
index 05af53f2b39..5ec9bc670e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.core.plugin.DirectoryBasedPluginFinder;
 import org.apache.flink.core.plugin.PluginDescriptor;
 import org.apache.flink.core.plugin.PluginFinder;
 import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.test.plugin.jar.pluginb.TestServiceB;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,6 +42,12 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link DefaultPluginManager}. */
 public class DefaultPluginManagerTest extends PluginTestBase {
@@ -86,22 +92,45 @@ public class DefaultPluginManagerTest extends PluginTestBase {
         final PluginManager pluginManager =
                 new DefaultPluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
         final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class));
-        Assert.assertEquals(2, serviceImplList.size());
+        assertEquals(2, serviceImplList.size());
 
         // check that all impl have unique classloader
         final Set<ClassLoader> classLoaders = Collections.newSetFromMap(new IdentityHashMap<>(3));
         classLoaders.add(PARENT_CLASS_LOADER);
         for (TestSpi testSpi : serviceImplList) {
-            Assert.assertNotNull(testSpi.testMethod());
-            Assert.assertTrue(classLoaders.add(testSpi.getClass().getClassLoader()));
+            assertNotNull(testSpi.testMethod());
+            assertTrue(classLoaders.add(testSpi.getClass().getClassLoader()));
         }
 
         final List<OtherTestSpi> otherServiceImplList =
                 Lists.newArrayList(pluginManager.load(OtherTestSpi.class));
-        Assert.assertEquals(1, otherServiceImplList.size());
+        assertEquals(1, otherServiceImplList.size());
         for (OtherTestSpi otherTestSpi : otherServiceImplList) {
-            Assert.assertNotNull(otherTestSpi.otherTestMethod());
-            Assert.assertTrue(classLoaders.add(otherTestSpi.getClass().getClassLoader()));
+            assertNotNull(otherTestSpi.otherTestMethod());
+            assertFalse(classLoaders.add(otherTestSpi.getClass().getClassLoader()));
         }
     }
+
+    @Test
+    public void classLoaderMustBeTheSameInsideAPlugin() {
+        String[] parentPatterns = {TestSpi.class.getName(), OtherTestSpi.class.getName()};
+        final PluginManager pluginManager =
+                new DefaultPluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
+        final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class));
+        assertEquals(2, serviceImplList.size());
+
+        final List<OtherTestSpi> otherServiceImplList =
+                Lists.newArrayList(pluginManager.load(OtherTestSpi.class));
+        assertEquals(1, otherServiceImplList.size());
+
+        // instanceof with multiple classloaders works only this way
+        final List<TestSpi> serviceBImplList =
+                serviceImplList.stream()
+                        .filter(s -> s.getClass().getName().equals(TestServiceB.class.getName()))
+                        .collect(Collectors.toList());
+        assertEquals(1, serviceBImplList.size());
+        assertEquals(
+                otherServiceImplList.get(0).getClass().getClassLoader(),
+                serviceBImplList.get(0).getClass().getClassLoader());
+    }
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2653fc46d9f..3f4d121ca71 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1296,7 +1296,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
         LOG.info("Adding delegation tokens to the AM container.");
 
         DelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(flinkConfiguration, null, null);
+                new DefaultDelegationTokenManager(flinkConfiguration, null, null, null);
         DelegationTokenContainer container = new DelegationTokenContainer();
         delegationTokenManager.obtainDelegationTokens(container);