You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arthur Mantelato Rosa <ar...@gmail.com> on 2023/04/07 10:26:53 UTC

Delegation Tokens config - Upgrade from 1.16.x to 1.17.0

Hi all,

I'm upgrading an application from Flink 1.16.1 to 1.17.0 and I noticed that
delegation tokens (DTs) configuration [1] seems to have started to be
mandatory. Is my understanding correct?

I found this announcement [2] saying that from 1.17.0 version DTs are
enabled by default [3] but it would be good to have something related to it
in the 1.17.0 release notes [4] if that's the case. Perhaps making it
disabled by default would be better.

For instance, if you try to run the TopSpeedWindowing streaming example [5]
against a fresh downloaded 1.17.0 distribution, you should get an error
message like this:

2023-04-07 09:18:32,814 [main] ERROR
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] -
Failed to initialize delegation token provider s3
java.lang.IllegalStateException: Delegation token provider with service
name {} has multiple implementations [s3]
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
~[flink-dist-1.17.0.jar:1.17.0]
at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
~[flink-dist-1.17.0.jar:1.17.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.Subject.doAs(Unknown Source) [?:?]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
[hadoop-common-2.8.5.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
[flink-dist-1.17.0.jar:1.17.0]
2023-04-07 09:18:32,824 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StandaloneApplicationClusterEntryPoint down with application status FAILED.
Diagnostics org.apache.flink.util.FlinkRuntimeException:
java.lang.IllegalStateException: Delegation token provider with service
name {} has multiple implementations [s3]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:151)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
Caused by: java.lang.IllegalStateException: Delegation token provider with
service name {} has multiple implementations [s3]
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
... 14 more

I only managed to make the example run by setting the configuration
"security.delegation.token.provider.s3.enabled" to "false".

Refs:
[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#auth-with-external-systems
[2]
https://flink.apache.org/2023/01/20/delegation-token-framework-obtain-distribute-and-use-temporary-credentials-automatically/
[3]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#security-delegation-tokens-enabled
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.17/release-notes/flink-1.17
[5]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/standalone/overview/#application-mode

Best regards,
Arthur

Re: Delegation Tokens config - Upgrade from 1.16.x to 1.17.0

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Arthur,

Delegation tokens were enabled all the time which is not changed since it
would be a breaking change. I would personally turn it off by default but
it's important to keep original behavior.

The manager is loading providers at the very beginning of the init process.
It loads and initializes all the providers which are on the classpath +
there is a service loader registration for it.
This exception comes when the manager sees 2 instances of a provider for
the same service. I'm pretty sure there are 2 instances
of S3DelegationTokenProvider's on the classpath.
This can be achieved by adding flink-s3-fs-hadoop and flink-s3-fs-presto
plugins at the same time.

As mentioned setting "security.delegation.token.provider.s3.enabled" to
"false" is also a solution.

BR,
G


On Fri, Apr 7, 2023 at 12:27 PM Arthur Mantelato Rosa <
arthurmantelato@gmail.com> wrote:

> Hi all,
>
> I'm upgrading an application from Flink 1.16.1 to 1.17.0 and I noticed
> that delegation tokens (DTs) configuration [1] seems to have started to be
> mandatory. Is my understanding correct?
>
> I found this announcement [2] saying that from 1.17.0 version DTs are
> enabled by default [3] but it would be good to have something related to it
> in the 1.17.0 release notes [4] if that's the case. Perhaps making it
> disabled by default would be better.
>
> For instance, if you try to run the TopSpeedWindowing streaming example
> [5] against a fresh downloaded 1.17.0 distribution, you should get an error
> message like this:
>
> 2023-04-07 09:18:32,814 [main] ERROR
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] -
> Failed to initialize delegation token provider s3
> java.lang.IllegalStateException: Delegation token provider with service
> name {} has multiple implementations [s3]
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
> ~[flink-dist-1.17.0.jar:1.17.0]
> at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
> at javax.security.auth.Subject.doAs(Unknown Source) [?:?]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> [hadoop-common-2.8.5.jar:?]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
> [flink-dist-1.17.0.jar:1.17.0]
> at
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
> [flink-dist-1.17.0.jar:1.17.0]
> 2023-04-07 09:18:32,824 [main] INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> StandaloneApplicationClusterEntryPoint down with application status FAILED.
> Diagnostics org.apache.flink.util.FlinkRuntimeException:
> java.lang.IllegalStateException: Delegation token provider with service
> name {} has multiple implementations [s3]
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:151)
> at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
> at java.base/java.security.AccessController.doPrivileged(Native Method)
> at java.base/javax.security.auth.Subject.doAs(Unknown Source)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
> at
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
> Caused by: java.lang.IllegalStateException: Delegation token provider with
> service name {} has multiple implementations [s3]
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> at
> org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
> ... 14 more
>
> I only managed to make the example run by setting the configuration
> "security.delegation.token.provider.s3.enabled" to "false".
>
> Refs:
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#auth-with-external-systems
> [2]
> https://flink.apache.org/2023/01/20/delegation-token-framework-obtain-distribute-and-use-temporary-credentials-automatically/
> [3]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#security-delegation-tokens-enabled
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/release-notes/flink-1.17
> [5]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/standalone/overview/#application-mode
>
> Best regards,
> Arthur
>
>