You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/11/09 12:44:01 UTC

[flink] branch master updated: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 82170c852ce [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
82170c852ce is described below

commit 82170c852ce803c0a84d613cc6e7c16df09992df
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Wed Nov 9 13:43:52 2022 +0100

    [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
    
    Co-authored-by: Jack <ja...@outlook.com>
---
 .../token/HBaseDelegationTokenProvider.java        | 165 +++++++++++++++++++++
 ....runtime.security.token.DelegationTokenProvider |   1 +
 .../KerberosDelegationTokenManagerITCase.java      |   3 +-
 .../java/org/apache/flink/yarn/YarnTestBase.java   |   5 +-
 4 files changed, 172 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
new file mode 100644
index 00000000000..5633cb4f533
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Delegation token provider implementation for HBase. Basically it would be good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            // HBaseConfiguration.create has been added to HBase in v0.90.0 so we can eliminate
+            // reflection magic when we drop ancient HBase support.
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();
+    }
+}
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
index 56619ee46d5..0d58510a34a 100644
--- a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
+org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
index 4fcfa29a031..0f114e10701 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
@@ -101,8 +101,9 @@ public class KerberosDelegationTokenManagerITCase {
         KerberosDelegationTokenManager delegationTokenManager =
                 new KerberosDelegationTokenManager(configuration, null, null);
 
-        assertEquals(2, delegationTokenManager.delegationTokenProviders.size());
+        assertEquals(3, delegationTokenManager.delegationTokenProviders.size());
         assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
+        assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index f540e96c9e9..640ff9f958c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -170,7 +170,10 @@ public abstract class YarnTestBase {
         // this can happen during cluster shutdown, if AMRMClient happens to be heartbeating
         Pattern.compile("Exception on heartbeat"),
         Pattern.compile("java\\.io\\.InterruptedIOException: Call interrupted"),
-        Pattern.compile("java\\.lang\\.InterruptedException")
+        Pattern.compile("java\\.lang\\.InterruptedException"),
+
+        // this can happen if the hbase delegation token provider is not available
+        Pattern.compile("ClassNotFoundException : \"org.apache.hadoop.hbase.HBaseConfiguration\"")
     };
 
     // Temp directory which is deleted after the unit test.