You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/11/09 12:44:01 UTC
[flink] branch master updated: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 82170c852ce [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
82170c852ce is described below
commit 82170c852ce803c0a84d613cc6e7c16df09992df
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Wed Nov 9 13:43:52 2022 +0100
[FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
Co-authored-by: Jack <ja...@outlook.com>
---
.../token/HBaseDelegationTokenProvider.java | 165 +++++++++++++++++++++
....runtime.security.token.DelegationTokenProvider | 1 +
.../KerberosDelegationTokenManagerITCase.java | 3 +-
.../java/org/apache/flink/yarn/YarnTestBase.java | 5 +-
4 files changed, 172 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
new file mode 100644
index 00000000000..5633cb4f533
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Delegation token provider implementation for HBase. Basically it would be good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+ org.apache.hadoop.conf.Configuration hbaseConf;
+
+ @Override
+ public String serviceName() {
+ return "hbase";
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ hbaseConf = getHBaseConfiguration(configuration);
+ }
+
+ private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+ org.apache.hadoop.conf.Configuration hbaseConf = null;
+ try {
+ org.apache.hadoop.conf.Configuration hadoopConf =
+ HadoopUtils.getHadoopConfiguration(conf);
+ // ----
+ // Intended call: HBaseConfiguration.create(conf);
+ // HBaseConfiguration.create has been added to HBase in v0.90.0 so we can eliminate
+ // reflection magic when we drop ancient HBase support.
+ hbaseConf =
+ (org.apache.hadoop.conf.Configuration)
+ Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+ .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+ .invoke(null, hadoopConf);
+ // ----
+ } catch (InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException e) {
+ LOG.info(
+ "HBase is not available (not packaged with this application): {} : \"{}\".",
+ e.getClass().getSimpleName(),
+ e.getMessage());
+ }
+ return hbaseConf;
+ }
+
+ @Override
+ public boolean delegationTokensRequired() {
+ try {
+ if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.debug("Hadoop Kerberos is not enabled.");
+ return false;
+ }
+ return Objects.nonNull(hbaseConf)
+ && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+ }
+
+ @Override
+ public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+ Token<?> token;
+ try {
+ Preconditions.checkNotNull(hbaseConf);
+ try {
+ LOG.info("Obtaining Kerberos security token for HBase");
+ // ----
+ // Intended call: Token<AuthenticationTokenIdentifier> token =
+ // TokenUtil.obtainToken(conf);
+ token =
+ (Token<?>)
+ Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+ .getMethod(
+ "obtainToken",
+ org.apache.hadoop.conf.Configuration.class)
+ .invoke(null, hbaseConf);
+ } catch (NoSuchMethodException e) {
+ // for HBase 2
+
+ // ----
+ // Intended call: ConnectionFactory connectionFactory =
+ // ConnectionFactory.createConnection(conf);
+ Closeable connectionFactory =
+ (Closeable)
+ Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+ .getMethod(
+ "createConnection",
+ org.apache.hadoop.conf.Configuration.class)
+ .invoke(null, hbaseConf);
+ // ----
+ Class<?> connectionClass =
+ Class.forName("org.apache.hadoop.hbase.client.Connection");
+ // ----
+ // Intended call: Token<AuthenticationTokenIdentifier> token =
+ // TokenUtil.obtainToken(connectionFactory);
+ token =
+ (Token<?>)
+ Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+ .getMethod("obtainToken", connectionClass)
+ .invoke(null, connectionFactory);
+ if (null != connectionFactory) {
+ connectionFactory.close();
+ }
+ }
+ if (token == null) {
+ LOG.error("No Kerberos security token for HBase available");
+ } else {
+ credentials.addToken(token.getService(), token);
+ LOG.info("Added HBase Kerberos security token to credentials.");
+ }
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException
+ | IOException e) {
+ LOG.info(
+ "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+ e.getClass().getSimpleName(),
+ e.getMessage());
+ }
+
+ // HBase does not support to renew the delegation token currently
+ // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+ return Optional.empty();
+ }
+}
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
index 56619ee46d5..0d58510a34a 100644
--- a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
+org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index 4fcfa29a031..0f114e10701 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -101,8 +101,9 @@ public class KerberosDelegationTokenManagerITCase {
KerberosDelegationTokenManager delegationTokenManager =
new KerberosDelegationTokenManager(configuration, null, null);
- assertEquals(2, delegationTokenManager.delegationTokenProviders.size());
+ assertEquals(3, delegationTokenManager.delegationTokenProviders.size());
assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
+ assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
assertTrue(delegationTokenManager.isProviderLoaded("test"));
assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
assertFalse(delegationTokenManager.isProviderLoaded("throw"));
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index f540e96c9e9..640ff9f958c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -170,7 +170,10 @@ public abstract class YarnTestBase {
// this can happen during cluster shutdown, if AMRMClient happens to be heartbeating
Pattern.compile("Exception on heartbeat"),
Pattern.compile("java\\.io\\.InterruptedIOException: Call interrupted"),
- Pattern.compile("java\\.lang\\.InterruptedException")
+ Pattern.compile("java\\.lang\\.InterruptedException"),
+
+ // this can happen if the hbase delegation token provider is not available
+ Pattern.compile("ClassNotFoundException : \"org.apache.hadoop.hbase.HBaseConfiguration\"")
};
// Temp directory which is deleted after the unit test.