You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2023/01/20 10:45:06 UTC
[flink] branch master updated: [FLINK-30704][filesystems][s3] Add S3 delegation token support
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0141f13ca80 [FLINK-30704][filesystems][s3] Add S3 delegation token support
0141f13ca80 is described below
commit 0141f13ca801d5db45435d101a9c3ef83889bbc0
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Fri Jan 6 12:20:42 2023 +0100
[FLINK-30704][filesystems][s3] Add S3 delegation token support
---
.../flink/core/plugin/DefaultPluginManager.java | 42 +++++++-
.../security/token/DelegationTokenProvider.java | 8 +-
.../security/token/DelegationTokenReceiver.java | 2 +-
.../fs/s3/common/AbstractS3FileSystemFactory.java | 2 +
.../DynamicTemporaryAWSCredentialsProvider.java | 65 ++++++++++++
.../s3/common/token/S3DelegationTokenProvider.java | 112 +++++++++++++++++++++
.../s3/common/token/S3DelegationTokenReceiver.java | 106 +++++++++++++++++++
...ink.core.security.token.DelegationTokenProvider | 3 +-
...ink.core.security.token.DelegationTokenReceiver | 3 +-
...DynamicTemporaryAWSCredentialsProviderTest.java | 73 ++++++++++++++
.../token/S3DelegationTokenProviderTest.java | 55 ++++++++++
.../token/S3DelegationTokenReceiverTest.java | 105 +++++++++++++++++++
.../runtime/entrypoint/ClusterEntrypoint.java | 5 +-
.../flink/runtime/minicluster/MiniCluster.java | 8 +-
.../token/DefaultDelegationTokenManager.java | 74 ++++++++------
.../DefaultDelegationTokenManagerFactory.java | 5 +-
.../token/DelegationTokenReceiverRepository.java | 72 +++++++------
.../token/hadoop/HBaseDelegationTokenProvider.java | 2 +-
.../hadoop/HadoopDelegationTokenReceiver.java | 2 +-
.../hadoop/HadoopFSDelegationTokenProvider.java | 2 +-
.../runtime/taskexecutor/TaskManagerRunner.java | 2 +-
...nk.core.security.token.DelegationTokenProvider} | 0
...nk.core.security.token.DelegationTokenReceiver} | 0
.../token/DefaultDelegationTokenManagerTest.java | 16 +--
.../DelegationTokenReceiverRepositoryTest.java | 8 +-
.../ExceptionThrowingDelegationTokenProvider.java | 1 +
.../ExceptionThrowingDelegationTokenReceiver.java | 1 +
.../token/TestDelegationTokenProvider.java | 1 +
.../token/TestDelegationTokenReceiver.java | 1 +
.../runtime/taskexecutor/TaskExecutorBuilder.java | 2 +-
...cutorExecutionDeploymentReconciliationTest.java | 2 +-
.../TaskExecutorPartitionLifecycleTest.java | 2 +-
.../taskexecutor/TaskExecutorSlotLifetimeTest.java | 2 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 4 +-
.../taskexecutor/TaskManagerRunnerStartupTest.java | 2 +-
.../TaskSubmissionTestEnvironment.java | 2 +-
...nk.core.security.token.DelegationTokenProvider} | 0
...nk.core.security.token.DelegationTokenReceiver} | 0
.../test/plugin/DefaultPluginManagerTest.java | 43 ++++++--
.../apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
40 files changed, 730 insertions(+), 107 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java b/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
index be8825bd0ca..07a3a533900 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/DefaultPluginManager.java
@@ -21,20 +21,31 @@ package org.apache.flink.core.plugin;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/** Default implementation of {@link PluginManager}. */
@Internal
@ThreadSafe
public class DefaultPluginManager implements PluginManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultPluginManager.class);
+
/**
* Parent-classloader to all classloader that are used for plugin loading. We expect that this
* is thread-safe.
@@ -44,6 +55,11 @@ public class DefaultPluginManager implements PluginManager {
/** A collection of descriptions of all plugins known to this plugin manager. */
private final Collection<PluginDescriptor> pluginDescriptors;
+ private final Lock pluginLoadersLock;
+
+ @GuardedBy("pluginLoadersLock")
+ private final Map<String, PluginLoader> pluginLoaders;
+
/** List of patterns for classes that should always be resolved from the parent ClassLoader. */
private final String[] alwaysParentFirstPatterns;
@@ -51,6 +67,8 @@ public class DefaultPluginManager implements PluginManager {
DefaultPluginManager() {
parentClassLoader = null;
pluginDescriptors = null;
+ pluginLoadersLock = null;
+ pluginLoaders = null;
alwaysParentFirstPatterns = null;
}
@@ -67,6 +85,8 @@ public class DefaultPluginManager implements PluginManager {
ClassLoader parentClassLoader,
String[] alwaysParentFirstPatterns) {
this.pluginDescriptors = pluginDescriptors;
+ this.pluginLoadersLock = new ReentrantLock();
+ this.pluginLoaders = new HashMap<>();
this.parentClassLoader = parentClassLoader;
this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
}
@@ -75,9 +95,23 @@ public class DefaultPluginManager implements PluginManager {
public <P> Iterator<P> load(Class<P> service) {
ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size());
for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
- PluginLoader pluginLoader =
- PluginLoader.create(
- pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
+ PluginLoader pluginLoader;
+ String pluginId = pluginDescriptor.getPluginId();
+ pluginLoadersLock.lock();
+ try {
+ if (pluginLoaders.containsKey(pluginId)) {
+ LOG.info("Plugin loader with ID found, reusing it: {}", pluginId);
+ pluginLoader = pluginLoaders.get(pluginId);
+ } else {
+ LOG.info("Plugin loader with ID not found, creating it: {}", pluginId);
+ pluginLoader =
+ PluginLoader.create(
+ pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
+ pluginLoaders.putIfAbsent(pluginId, pluginLoader);
+ }
+ } finally {
+ pluginLoadersLock.unlock();
+ }
combinedIterators.add(pluginLoader.load(service));
}
return Iterators.concat(combinedIterators.iterator());
@@ -90,6 +124,8 @@ public class DefaultPluginManager implements PluginManager {
+ parentClassLoader
+ ", pluginDescriptors="
+ pluginDescriptors
+ + ", pluginLoaders="
+ + Joiner.on(",").withKeyValueSeparator("=").join(pluginLoaders)
+ ", alwaysParentFirstPatterns="
+ Arrays.toString(alwaysParentFirstPatterns)
+ '}';
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
similarity index 91%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
rename to flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
index c6d7dcea413..676148564b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++ b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.core.security.token;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.Configuration;
import java.util.Optional;
/**
- * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by {@link
- * DelegationTokenManager} through service loader. Basically the implementation of this interface is
+ * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by
+ * DelegationTokenManager through service loader. Basically the implementation of this interface is
* responsible to produce the serialized form of tokens which will be handled by {@link
* DelegationTokenReceiver} instances both on JobManager and TaskManager side.
*/
@@ -69,7 +69,7 @@ public interface DelegationTokenProvider {
}
/**
- * Called by {@link DelegationTokenManager} to initialize provider after construction.
+ * Called by DelegationTokenManager to initialize provider after construction.
*
* @param configuration Configuration to initialize the provider.
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
similarity index 97%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
rename to flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
index b296186edc3..78918100427 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
+++ b/flink-core/src/main/java/org/apache/flink/core/security/token/DelegationTokenReceiver.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.core.security.token;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 024abbdc12d..0dac58be3bf 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.Preconditions;
@@ -123,6 +124,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
// create the Hadoop FileSystem
org.apache.hadoop.conf.Configuration hadoopConfig =
hadoopConfigLoader.getOrLoadHadoopConfig();
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
new file mode 100644
index 00000000000..8760f1aa3ee
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Support dynamic session credentials for authenticating with AWS. Please note that users may
+ * reference this class name from configuration property fs.s3a.aws.credentials.provider. Therefore,
+ * changing the class name would be a backward-incompatible change. This credential provider must
+ * not fail in creation because that will break a chain of credential providers.
+ */
+@Internal
+public class DynamicTemporaryAWSCredentialsProvider implements AWSCredentialsProvider {
+
+ public static final String NAME = DynamicTemporaryAWSCredentialsProvider.class.getName();
+
+ public static final String COMPONENT = "Dynamic session credentials for Flink";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicTemporaryAWSCredentialsProvider.class);
+
+ @Override
+ public AWSCredentials getCredentials() throws SdkBaseException {
+ Credentials credentials = S3DelegationTokenReceiver.getCredentials();
+ if (credentials == null) {
+ throw new NoAwsCredentialsException(COMPONENT);
+ }
+ LOG.debug("Providing session credentials");
+ return new BasicSessionCredentials(
+ credentials.getAccessKeyId(),
+ credentials.getSecretAccessKey(),
+ credentials.getSessionToken());
+ }
+
+ @Override
+ public void refresh() {
+ // Intentionally blank. Credentials are updated by S3DelegationTokenReceiver
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
new file mode 100644
index 00000000000..4c04b975908
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Delegation token provider for S3 filesystems. */
+@Internal
+public class S3DelegationTokenProvider implements DelegationTokenProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenProvider.class);
+
+ private String region;
+ private String accessKey;
+ private String secretKey;
+
+ @Override
+ public String serviceName() {
+ return "s3";
+ }
+
+ @Override
+ public void init(Configuration configuration) {
+ region = configuration.getString(String.format("%s.region", serviceConfigPrefix()), null);
+ if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+ LOG.debug("Region: " + region);
+ }
+
+ accessKey =
+ configuration.getString(
+ String.format("%s.access-key", serviceConfigPrefix()), null);
+ if (!StringUtils.isNullOrWhitespaceOnly(accessKey)) {
+ LOG.debug("Access key: " + accessKey);
+ }
+
+ secretKey =
+ configuration.getString(
+ String.format("%s.secret-key", serviceConfigPrefix()), null);
+ if (!StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+ LOG.debug(
+ "Secret key: "
+ + GlobalConfiguration.HIDDEN_CONTENT
+ + " (sensitive information)");
+ }
+ }
+
+ @Override
+ public boolean delegationTokensRequired() {
+ if (StringUtils.isNullOrWhitespaceOnly(region)
+ || StringUtils.isNullOrWhitespaceOnly(accessKey)
+ || StringUtils.isNullOrWhitespaceOnly(secretKey)) {
+ LOG.debug("Not obtaining session credentials because not all configurations are set");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+ LOG.info("Obtaining session credentials token with access key: {}", accessKey);
+
+ AWSSecurityTokenService stsClient =
+ AWSSecurityTokenServiceClientBuilder.standard()
+ .withRegion(region)
+ .withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(accessKey, secretKey)))
+ .build();
+ GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
+ Credentials credentials = sessionTokenResult.getCredentials();
+ LOG.info(
+ "Session credentials obtained successfully with access key: {} expiration: {}",
+ credentials.getAccessKeyId(),
+ credentials.getExpiration());
+
+ return new ObtainedDelegationTokens(
+ InstantiationUtil.serializeObject(credentials),
+ Optional.of(credentials.getExpiration().getTime()));
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
new file mode 100644
index 00000000000..55dd18d3c4b
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
+
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** Delegation token receiver for S3 filesystems. */
+@Internal
+public class S3DelegationTokenReceiver implements DelegationTokenReceiver {
+
+ public static final String PROVIDER_CONFIG_NAME = "fs.s3a.aws.credentials.provider";
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
+
+ @VisibleForTesting @Nullable static volatile Credentials credentials;
+
+ @VisibleForTesting @Nullable static volatile String region;
+
+ public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) {
+ LOG.info("Updating Hadoop configuration");
+
+ String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
+ if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
+ if (providers.isEmpty()) {
+ LOG.debug("Setting provider");
+ providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+ } else {
+ providers = DynamicTemporaryAWSCredentialsProvider.NAME + "," + providers;
+ LOG.debug("Prepending provider, new providers value: {}", providers);
+ }
+ hadoopConfig.set(PROVIDER_CONFIG_NAME, providers);
+ } else {
+ LOG.debug("Provider already exists");
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+ LOG.debug("Setting region");
+ hadoopConfig.set("fs.s3a.endpoint.region", region);
+ }
+
+ LOG.info("Updated Hadoop configuration successfully");
+ }
+
+ @Override
+ public String serviceName() {
+ return "s3";
+ }
+
+ @Override
+ public void init(Configuration configuration) {
+ region =
+ configuration.getString(
+ String.format(
+ "%s.%s.region",
+ DelegationTokenProvider.CONFIG_PREFIX, serviceName()),
+ null);
+ if (!StringUtils.isNullOrWhitespaceOnly(region)) {
+ LOG.debug("Region: " + region);
+ }
+ }
+
+ @Override
+ public void onNewTokensObtained(byte[] tokens) throws Exception {
+ LOG.info("Updating session credentials");
+ credentials =
+ InstantiationUtil.deserializeObject(
+ tokens, S3DelegationTokenReceiver.class.getClassLoader());
+ LOG.info(
+ "Session credentials updated successfully with access key: {} expiration: {}",
+ credentials.getAccessKeyId(),
+ credentials.getExpiration());
+ }
+
+ @Nullable
+ public static Credentials getCredentials() {
+ return credentials;
+ }
+}
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 83%
copy from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
copy to flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index bd3ed1d6482..d2e298d1a28 100644
--- a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.runtime.security.token.TestDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider
+org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 83%
copy from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
copy to flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index bd3ed1d6482..59ba43f7f2a 100644
--- a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.runtime.security.token.TestDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider
+org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
new file mode 100644
index 00000000000..a6b926f8737
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.services.securitytoken.model.Credentials;
+import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
+import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link DynamicTemporaryAWSCredentialsProvider}. */
+public class DynamicTemporaryAWSCredentialsProviderTest {
+
+ private static final String ACCESS_KEY_ID = "testAccessKeyId";
+ private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
+ private static final String SESSION_TOKEN = "testSessionToken";
+
+ @BeforeEach
+ void beforeEach() {
+ S3DelegationTokenReceiver.credentials = null;
+ }
+
+ @AfterEach
+ void afterEach() {
+ S3DelegationTokenReceiver.credentials = null;
+ }
+
+ @Test
+ public void getCredentialsShouldThrowExceptionWhenNoCredentials() {
+ DynamicTemporaryAWSCredentialsProvider provider =
+ new DynamicTemporaryAWSCredentialsProvider();
+
+ assertThrows(NoAwsCredentialsException.class, provider::getCredentials);
+ }
+
+ @Test
+ public void getCredentialsShouldStoreCredentialsWhenCredentialsProvided() throws Exception {
+ DynamicTemporaryAWSCredentialsProvider provider =
+ new DynamicTemporaryAWSCredentialsProvider();
+ Credentials credentials =
+ new Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, SESSION_TOKEN, null);
+ S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+
+ receiver.onNewTokensObtained(InstantiationUtil.serializeObject(credentials));
+ BasicSessionCredentials returnedCredentials =
+ (BasicSessionCredentials) provider.getCredentials();
+ assertEquals(returnedCredentials.getAWSAccessKeyId(), credentials.getAccessKeyId());
+ assertEquals(returnedCredentials.getAWSSecretKey(), credentials.getSecretAccessKey());
+ assertEquals(returnedCredentials.getSessionToken(), credentials.getSessionToken());
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
new file mode 100644
index 00000000000..d0109cbc44d
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link S3DelegationTokenProvider}. */
+public class S3DelegationTokenProviderTest {
+
+ private static final String REGION = "testRegion";
+ private static final String ACCESS_KEY_ID = "testAccessKeyId";
+ private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
+
+ @Test
+ public void delegationTokensRequiredShouldReturnFalseWithoutCredentials() {
+ S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
+ provider.init(new Configuration());
+
+ assertFalse(provider.delegationTokensRequired());
+ }
+
+ @Test
+ public void delegationTokensRequiredShouldReturnTrueWithCredentials() {
+ S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
+ Configuration configuration = new Configuration();
+ configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
+ configuration.setString(CONFIG_PREFIX + ".s3.access-key", ACCESS_KEY_ID);
+ configuration.setString(CONFIG_PREFIX + ".s3.secret-key", SECRET_ACCESS_KEY);
+ provider.init(configuration);
+
+ assertTrue(provider.delegationTokensRequired());
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
new file mode 100644
index 00000000000..ab817b0a532
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Tests for {@link S3DelegationTokenReceiver}. */
+public class S3DelegationTokenReceiverTest {
+
+ private static final String PROVIDER_CLASS_NAME = "TestProvider";
+ private static final String REGION = "testRegion";
+
+ @BeforeEach
+ void beforeEach() {
+ S3DelegationTokenReceiver.region = null;
+ }
+
+ @AfterEach
+ void afterEach() {
+ S3DelegationTokenReceiver.region = null;
+ }
+
+ @Test
+ public void updateHadoopConfigShouldSetProviderWhenEmpty() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME, "");
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertEquals(
+ DynamicTemporaryAWSCredentialsProvider.NAME,
+ hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
+ }
+
+ @Test
+ public void updateHadoopConfigShouldPrependProviderWhenNotEmpty() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME, PROVIDER_CLASS_NAME);
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ String[] providers = hadoopConfiguration.get(PROVIDER_CONFIG_NAME).split(",");
+ assertEquals(2, providers.length);
+ assertEquals(DynamicTemporaryAWSCredentialsProvider.NAME, providers[0]);
+ assertEquals(PROVIDER_CLASS_NAME, providers[1]);
+ }
+
+ @Test
+ public void updateHadoopConfigShouldNotAddProviderWhenAlreadyExists() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME, DynamicTemporaryAWSCredentialsProvider.NAME);
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertEquals(
+ DynamicTemporaryAWSCredentialsProvider.NAME,
+ hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
+ }
+
+ @Test
+ public void updateHadoopConfigShouldNotUpdateRegionWhenNotConfigured() {
+ S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+ receiver.init(new Configuration());
+
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertNull(hadoopConfiguration.get("fs.s3a.endpoint.region"));
+ }
+
+ @Test
+ public void updateHadoopConfigShouldUpdateRegionWhenConfigured() {
+ S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+ Configuration configuration = new Configuration();
+ configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
+ receiver.init(configuration);
+
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertEquals(REGION, hadoopConfiguration.get("fs.s3a.endpoint.region"));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 92c1a5ff79d..094a8c85a2b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -390,7 +390,10 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
heartbeatServices = createHeartbeatServices(configuration);
delegationTokenManager =
DefaultDelegationTokenManagerFactory.create(
- configuration, commonRpcService.getScheduledExecutor(), ioExecutor);
+ configuration,
+ pluginManager,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1d48126e141..ad708d96605 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -433,10 +433,14 @@ public class MiniCluster implements AutoCloseableAsync {
delegationTokenManager =
DefaultDelegationTokenManagerFactory.create(
- configuration, commonRpcService.getScheduledExecutor(), ioExecutor);
+ configuration,
+ miniClusterConfiguration.getPluginManager(),
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(configuration);
+ new DelegationTokenReceiverRepository(
+ configuration, miniClusterConfiguration.getPluginManager());
blobCacheService =
BlobUtils.createBlobCacheService(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index fb5a195d2cc..b59abbe59cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -41,10 +44,12 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -70,6 +75,8 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
private final Configuration configuration;
+ @Nullable private final PluginManager pluginManager;
+
private final double tokensRenewalTimeRatio;
private final long renewalRetryBackoffPeriod;
@@ -92,15 +99,17 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
public DefaultDelegationTokenManager(
Configuration configuration,
+ @Nullable PluginManager pluginManager,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {
this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+ this.pluginManager = pluginManager;
this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
this.renewalRetryBackoffPeriod =
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
this.delegationTokenProviders = loadProviders();
this.delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(configuration);
+ new DelegationTokenReceiverRepository(configuration, pluginManager);
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
checkProviderAndReceiverConsistency(
@@ -111,36 +120,39 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
private Map<String, DelegationTokenProvider> loadProviders() {
LOG.info("Loading delegation token providers");
- ServiceLoader<DelegationTokenProvider> serviceLoader =
- ServiceLoader.load(DelegationTokenProvider.class);
-
Map<String, DelegationTokenProvider> providers = new HashMap<>();
- for (DelegationTokenProvider provider : serviceLoader) {
- try {
- if (isProviderEnabled(configuration, provider.serviceName())) {
- provider.init(configuration);
- LOG.info(
- "Delegation token provider {} loaded and initialized",
- provider.serviceName());
- checkState(
- !providers.containsKey(provider.serviceName()),
- "Delegation token provider with service name {} has multiple implementations",
- provider.serviceName());
- providers.put(provider.serviceName(), provider);
- } else {
- LOG.info(
- "Delegation token provider {} is disabled so not loaded",
- provider.serviceName());
- }
- } catch (Exception | NoClassDefFoundError e) {
- // The intentional general rule is that if a provider's init method throws exception
- // then stop the workload
- LOG.error(
- "Failed to initialize delegation token provider {}",
- provider.serviceName(),
- e);
- throw new FlinkRuntimeException(e);
- }
+ Consumer<DelegationTokenProvider> loadProvider =
+ (provider) -> {
+ try {
+ if (isProviderEnabled(configuration, provider.serviceName())) {
+ provider.init(configuration);
+ LOG.info(
+ "Delegation token provider {} loaded and initialized",
+ provider.serviceName());
+ checkState(
+ !providers.containsKey(provider.serviceName()),
+ "Delegation token provider with service name {} has multiple implementations",
+ provider.serviceName());
+ providers.put(provider.serviceName(), provider);
+ } else {
+ LOG.info(
+ "Delegation token provider {} is disabled so not loaded",
+ provider.serviceName());
+ }
+ } catch (Exception | NoClassDefFoundError e) {
+ // The intentional general rule is that if a provider's init method throws
+ // exception
+ // then stop the workload
+ LOG.error(
+ "Failed to initialize delegation token provider {}",
+ provider.serviceName(),
+ e);
+ throw new FlinkRuntimeException(e);
+ }
+ };
+ ServiceLoader.load(DelegationTokenProvider.class).iterator().forEachRemaining(loadProvider);
+ if (pluginManager != null) {
+ pluginManager.load(DelegationTokenProvider.class).forEachRemaining(loadProvider);
}
LOG.info("Delegation token providers loaded successfully");
@@ -150,7 +162,7 @@ public class DefaultDelegationTokenManager implements DelegationTokenManager {
static boolean isProviderEnabled(Configuration configuration, String serviceName) {
return configuration.getBoolean(
- String.format("security.delegation.token.provider.%s.enabled", serviceName), true);
+ String.format("%s.%s.enabled", CONFIG_PREFIX, serviceName), true);
}
@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
index 025b535bd67..fca48273516 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
@@ -40,12 +41,14 @@ public class DefaultDelegationTokenManagerFactory {
public static DelegationTokenManager create(
Configuration configuration,
+ @Nullable PluginManager pluginManager,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor)
throws IOException {
if (configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
- return new DefaultDelegationTokenManager(configuration, scheduledExecutor, ioExecutor);
+ return new DefaultDelegationTokenManager(
+ configuration, pluginManager, scheduledExecutor, ioExecutor);
} else {
return new NoOpDelegationTokenManager();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
index 8e63c3b4390..6d8f6710edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.function.Consumer;
import static org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,46 +49,53 @@ public class DelegationTokenReceiverRepository {
private final Configuration configuration;
+ @Nullable private final PluginManager pluginManager;
+
@VisibleForTesting final Map<String, DelegationTokenReceiver> delegationTokenReceivers;
- public DelegationTokenReceiverRepository(Configuration configuration) {
+ public DelegationTokenReceiverRepository(
+ Configuration configuration, @Nullable PluginManager pluginManager) {
this.configuration = checkNotNull(configuration, "Flink configuration must not be null");
+ this.pluginManager = pluginManager;
this.delegationTokenReceivers = loadReceivers();
}
private Map<String, DelegationTokenReceiver> loadReceivers() {
LOG.info("Loading delegation token receivers");
- ServiceLoader<DelegationTokenReceiver> serviceLoader =
- ServiceLoader.load(DelegationTokenReceiver.class);
-
Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
- for (DelegationTokenReceiver receiver : serviceLoader) {
- try {
- if (isProviderEnabled(configuration, receiver.serviceName())) {
- receiver.init(configuration);
- LOG.info(
- "Delegation token receiver {} loaded and initialized",
- receiver.serviceName());
- checkState(
- !receivers.containsKey(receiver.serviceName()),
- "Delegation token receiver with service name {} has multiple implementations",
- receiver.serviceName());
- receivers.put(receiver.serviceName(), receiver);
- } else {
- LOG.info(
- "Delegation token receiver {} is disabled so not loaded",
- receiver.serviceName());
- }
- } catch (Exception | NoClassDefFoundError e) {
- // The intentional general rule is that if a receiver's init method throws exception
- // then stop the workload
- LOG.error(
- "Failed to initialize delegation token receiver {}",
- receiver.serviceName(),
- e);
- throw new FlinkRuntimeException(e);
- }
+ Consumer<DelegationTokenReceiver> loadReceiver =
+ (receiver) -> {
+ try {
+ if (isProviderEnabled(configuration, receiver.serviceName())) {
+ receiver.init(configuration);
+ LOG.info(
+ "Delegation token receiver {} loaded and initialized",
+ receiver.serviceName());
+ checkState(
+ !receivers.containsKey(receiver.serviceName()),
+ "Delegation token receiver with service name {} has multiple implementations",
+ receiver.serviceName());
+ receivers.put(receiver.serviceName(), receiver);
+ } else {
+ LOG.info(
+ "Delegation token receiver {} is disabled so not loaded",
+ receiver.serviceName());
+ }
+ } catch (Exception | NoClassDefFoundError e) {
+ // The intentional general rule is that if a receiver's init method throws
+ // exception
+ // then stop the workload
+ LOG.error(
+ "Failed to initialize delegation token receiver {}",
+ receiver.serviceName(),
+ e);
+ throw new FlinkRuntimeException(e);
+ }
+ };
+ ServiceLoader.load(DelegationTokenReceiver.class).iterator().forEachRemaining(loadReceiver);
+ if (pluginManager != null) {
+ pluginManager.load(DelegationTokenReceiver.class).forEachRemaining(loadReceiver);
}
LOG.info("Delegation token receivers loaded successfully");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 7e8dd66ca54..cdabe8a9d12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.Preconditions;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
index 0b1cba0f738..f5d912b2b85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenReceiver;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 23202d27df9..960a80509d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkRuntimeException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index a7207e1fd8c..0bbd2d9ee6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -243,7 +243,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
configuration, pluginManager);
final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(configuration);
+ new DelegationTokenReceiverRepository(configuration, pluginManager);
taskExecutorService =
taskExecutorServiceFactory.createTaskExecutor(
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 100%
rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 100%
rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index ed34ea978de..e211389573b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -35,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.time.Instant.ofEpochMilli;
import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
-import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,7 +75,8 @@ public class DefaultDelegationTokenManagerTest {
@Test
public void configurationIsNullMustFailFast() {
- assertThrows(Exception.class, () -> new DefaultDelegationTokenManager(null, null, null));
+ assertThrows(
+ Exception.class, () -> new DefaultDelegationTokenManager(null, null, null, null));
}
@Test
@@ -82,7 +85,7 @@ public class DefaultDelegationTokenManagerTest {
Exception.class,
() -> {
ExceptionThrowingDelegationTokenProvider.throwInInit = true;
- new DefaultDelegationTokenManager(new Configuration(), null, null);
+ new DefaultDelegationTokenManager(new Configuration(), null, null, null);
});
}
@@ -91,7 +94,7 @@ public class DefaultDelegationTokenManagerTest {
Configuration configuration = new Configuration();
configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
DefaultDelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(configuration, null, null);
+ new DefaultDelegationTokenManager(configuration, null, null, null);
assertEquals(3, delegationTokenManager.delegationTokenProviders.size());
@@ -171,7 +174,8 @@ public class DefaultDelegationTokenManagerTest {
configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
DefaultDelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(configuration, scheduledExecutor, scheduler) {
+ new DefaultDelegationTokenManager(
+ configuration, null, scheduledExecutor, scheduler) {
@Override
void startTokensUpdate() {
startTokensUpdateCallCount.incrementAndGet();
@@ -197,7 +201,7 @@ public class DefaultDelegationTokenManagerTest {
configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5);
DefaultDelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(configuration, null, null);
+ new DefaultDelegationTokenManager(configuration, null, null, null);
Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault());
assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
index db93128d7ab..b1605226bd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -24,7 +24,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,7 +45,7 @@ class DelegationTokenReceiverRepositoryTest {
@Test
public void configurationIsNullMustFailFast() {
- assertThrows(Exception.class, () -> new DelegationTokenReceiverRepository(null));
+ assertThrows(Exception.class, () -> new DelegationTokenReceiverRepository(null, null));
}
@Test
@@ -54,7 +54,7 @@ class DelegationTokenReceiverRepositoryTest {
Exception.class,
() -> {
ExceptionThrowingDelegationTokenReceiver.throwInInit = true;
- new DelegationTokenReceiverRepository(new Configuration());
+ new DelegationTokenReceiverRepository(new Configuration(), null);
});
}
@@ -63,7 +63,7 @@ class DelegationTokenReceiverRepositoryTest {
Configuration configuration = new Configuration();
configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
DelegationTokenReceiverRepository delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(configuration);
+ new DelegationTokenReceiverRepository(configuration, null);
assertEquals(3, delegationTokenReceiverRepository.delegationTokenReceivers.size());
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
index 05b2875a8d1..93a78420056 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
import java.util.Optional;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index 41e72c11cce..f1d919f34dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
/**
* An example implementation of {@link DelegationTokenReceiver} which throws exception when enabled.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
index a5cf75d3dc4..384bf0881bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
/** An example implementation of {@link DelegationTokenProvider} which does nothing. */
public class TestDelegationTokenProvider implements DelegationTokenProvider {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
index 405d026a3a1..5a6c4b5e025 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
/** An example implementation of {@link DelegationTokenReceiver} which does nothing. */
public class TestDelegationTokenReceiver implements DelegationTokenReceiver {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
index 362b4de5957..7ef93ab4a3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -145,7 +145,7 @@ public class TaskExecutorBuilder {
}
final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(configuration);
+ new DelegationTokenReceiverRepository(configuration, null);
return new TaskExecutor(
rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 6933d29b551..caf60130b5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -253,7 +253,7 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
new TestingTaskExecutorPartitionTracker(),
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index e2afdfb703a..9d31376451c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -601,7 +601,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger {
NoOpTaskExecutorBlobService.INSTANCE,
new TestingFatalErrorHandler(),
partitionTracker,
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private static TaskSlotTable<Task> createTaskSlotTable() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 55cb4f7c6b5..9a1d15f0fd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -238,7 +238,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
new TestingTaskExecutorPartitionTracker(),
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6d3a885ca43..c13e19a121c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2814,7 +2814,7 @@ public class TaskExecutorTest extends TestLogger {
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandler,
taskExecutorPartitionTracker,
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices)
@@ -2850,7 +2850,7 @@ public class TaskExecutorTest extends TestLogger {
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 79ff1dc541d..d2e113a4057 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -294,6 +294,6 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
workingDirectory,
error -> {},
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 70f42962e51..ebfd25ac3e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -270,7 +270,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
taskExecutorBlobService,
testingFatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
- new DelegationTokenReceiverRepository(configuration));
+ new DelegationTokenReceiverRepository(configuration, null));
}
private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 100%
rename from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
rename to flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 100%
rename from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
rename to flink-runtime/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
index 05af53f2b39..5ec9bc670e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/DefaultPluginManagerTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.core.plugin.DirectoryBasedPluginFinder;
import org.apache.flink.core.plugin.PluginDescriptor;
import org.apache.flink.core.plugin.PluginFinder;
import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.test.plugin.jar.pluginb.TestServiceB;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -42,6 +42,12 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link DefaultPluginManager}. */
public class DefaultPluginManagerTest extends PluginTestBase {
@@ -86,22 +92,45 @@ public class DefaultPluginManagerTest extends PluginTestBase {
final PluginManager pluginManager =
new DefaultPluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class));
- Assert.assertEquals(2, serviceImplList.size());
+ assertEquals(2, serviceImplList.size());
// check that all impl have unique classloader
final Set<ClassLoader> classLoaders = Collections.newSetFromMap(new IdentityHashMap<>(3));
classLoaders.add(PARENT_CLASS_LOADER);
for (TestSpi testSpi : serviceImplList) {
- Assert.assertNotNull(testSpi.testMethod());
- Assert.assertTrue(classLoaders.add(testSpi.getClass().getClassLoader()));
+ assertNotNull(testSpi.testMethod());
+ assertTrue(classLoaders.add(testSpi.getClass().getClassLoader()));
}
final List<OtherTestSpi> otherServiceImplList =
Lists.newArrayList(pluginManager.load(OtherTestSpi.class));
- Assert.assertEquals(1, otherServiceImplList.size());
+ assertEquals(1, otherServiceImplList.size());
for (OtherTestSpi otherTestSpi : otherServiceImplList) {
- Assert.assertNotNull(otherTestSpi.otherTestMethod());
- Assert.assertTrue(classLoaders.add(otherTestSpi.getClass().getClassLoader()));
+ assertNotNull(otherTestSpi.otherTestMethod());
+ assertFalse(classLoaders.add(otherTestSpi.getClass().getClassLoader()));
}
}
+
+ @Test
+ public void classLoaderMustBeTheSameInsideAPlugin() {
+ String[] parentPatterns = {TestSpi.class.getName(), OtherTestSpi.class.getName()};
+ final PluginManager pluginManager =
+ new DefaultPluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
+ final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class));
+ assertEquals(2, serviceImplList.size());
+
+ final List<OtherTestSpi> otherServiceImplList =
+ Lists.newArrayList(pluginManager.load(OtherTestSpi.class));
+ assertEquals(1, otherServiceImplList.size());
+
+ // instanceof with multiple classloaders works only this way
+ final List<TestSpi> serviceBImplList =
+ serviceImplList.stream()
+ .filter(s -> s.getClass().getName().equals(TestServiceB.class.getName()))
+ .collect(Collectors.toList());
+ assertEquals(1, serviceBImplList.size());
+ assertEquals(
+ otherServiceImplList.get(0).getClass().getClassLoader(),
+ serviceBImplList.get(0).getClass().getClassLoader());
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2653fc46d9f..3f4d121ca71 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1296,7 +1296,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
LOG.info("Adding delegation tokens to the AM container.");
DelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(flinkConfiguration, null, null);
+ new DefaultDelegationTokenManager(flinkConfiguration, null, null, null);
DelegationTokenContainer container = new DelegationTokenContainer();
delegationTokenManager.obtainDelegationTokens(container);