You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 01:04:29 UTC

[GitHub] [flink] JackWangCS opened a new pull request, #20206: [FLINK-25908][runtime][security] Add HBaseDelegationTokenProvider

JackWangCS opened a new pull request, #20206:
URL: https://github.com/apache/flink/pull/20206

   ## What is the purpose of the change
   
   This PR adds HBaseDelegationTokenProvider to obtain HBase Delegation tokens using the new delegation token framework.
   
   
   ## Brief change log
   
     - Add `HBaseDelegationTokenProvider`
     - Add `HBaseDelegationTokenProvider` into the `META-INFO` service registration
   
   
   ## Verifying this change
    - This provider functionality will be covered by some existing tests. But We don't have dedicated tests to test the HBase delegation token obtainning. I think we may need to add new end-to-end tests to cover it. 
   
    
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917887104


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);

Review Comment:
   OK, then this must be mentioned in the upcoming documentation or 1.x support needs to be added. I would vote on the first known limitation unless somebody has objection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917889238


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();

Review Comment:
   Thanks! The field is definitely there but the question is whether it's filled w/ proper info or not. I've had an informal chat w/ one of the HBase guys and told that it's filled but we really need to proof it via logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917755542


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);

Review Comment:
   Just another question for my own understanding. `obtainToken(Configuration conf)` is a deprecated method and in Hbase 2.0.0 the method is already removed and because of that we use `obtainToken(Connection conn) API` which is good. That means Flink supports HBase 2.0.0+, right?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);

Review Comment:
   Just another question for my own understanding. `obtainToken(Configuration conf)` is a deprecated method and in Hbase 2.0.0 the method is already removed and because of that we use `obtainToken(Connection conn)` API which is good. That means Flink supports HBase 2.0.0+, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917703989


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   The question is more like: Is it possible to connect to HBase (and maybe use the obtained token) without the HBase connector or not. If the connector is must to make connection then it makes sense to encapsulate everything into the connector.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   The question is more like: Is it possible to connect to HBase (and maybe use the obtained token) without the HBase connector or not? If the connector is must to make connection then it makes sense to encapsulate everything into the connector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1286762134

   @JackWangCS [This](https://github.com/apache/flink/pull/20206#discussion_r928147951) is the last questionable part to be merged. Are you actively working on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1193143938

   Some updates: 
   
   - Finished tests on HBase 1.x on Yarn cluster. Found a issue to obtain tokens for HBase 2.2 for both delegation token provider and the `Utils.setTokensFor()`. See: https://gist.github.com/JackWangCS/3019355b9fc4746024fe72b3bbad839b
   - Start to test the feature on K8s.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r928147951


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();

Review Comment:
   In the meantime I was thinking. It's clear that it's not possible to renew the token for HBase which is fine. On the other hand if we give back `Optional.empty()` here then Flink will never ever re-obtain HBase token when it's the only active token provider. I think it would be good to give back expire date field if it's filled since we're re-obtaining the token and not renewing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1179551312

   > 
   
   I am trying to follow your way to validate the HBase delegation provider. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r916675013


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        getHBaseConfiguration(configuration);

Review Comment:
   Minor thing but I would change the following way which is maybe more clear:
   `hbaseConf = getHBaseConfiguration(configuration);`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   I think the class must be moved to `flink-connector-hbase-base` unless there is a blocking reason.
   For example Hadoop FS provider would end-up in circular dependency so it was not possible to move...
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1178846915

   @JackWangCS seems like the code is not properly formatted and that's the reason why the compile failed. Plz execute the following command to auto-format the code: `mvn spotless:apply`, then
   ```
   git add --all
   git commit --amend
   git push -f
   ```
   to push the changes to the PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917272181


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        getHBaseConfiguration(configuration);

Review Comment:
   I have changed the `getHBaseConfiguration()` to return the configuration instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917885725


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)

Review Comment:
   I've taken a deeper look and until we don't move the provider into the HBase connector it's not possible to remove reflection (flink-runtime doesn't have any HBase dependency which should stay like that).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1186998972

   Commented to the gist to understand the issue + solution proposal.
   BTW, I've created a secure HBase k8s instance which provides test possibility w/ minikube...
   Plz see my previous PRs where linked the gist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1187003781

   Thanks, I will check your gist and conduct tests both on K8s and Yarn environments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mbalassi closed pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
mbalassi closed pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
URL: https://github.com/apache/flink/pull/20206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917272387


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   Main concern from my side is that HBase delegation token can be obtained without `flink-connector-hbase-base` connector. If we move it to `flink-connector-hbase-base`, then the behavior is changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917725250


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   Currently, it's possible to connect to HBase without the connector, the connector is not a must. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r916381471


##########
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider:
##########
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
+org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider

Review Comment:
   The HBaseDelegationTokenProvider is still placed here for compatibility, we can move it to hbase-connector in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917702652


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        getHBaseConfiguration(configuration);

Review Comment:
   This part looks good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1180184746

   Based on the discussion I'm fine w/ the actual stand of the code. The only thing is what I'm waiting for is the log what I've described before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917880403


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();

Review Comment:
   I need to do some tests to confirm this, if the token contains the expire date, I could make the method to return the renew date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mbalassi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
mbalassi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1308692217

   Closing in favor of #21140.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917887104


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);

Review Comment:
   OK, then either this must be mentioned in the upcoming documentation or 1.x support needs to be added. I would vote on the first known limitation unless somebody has objection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917878624


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);

Review Comment:
   Yes, it's supported for HBase 2.0.0+ version now, Flink provides both HBase 1.0.0+ and HBase 2.0.0+ SQL connectors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917878427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)

Review Comment:
   Yes, if we decide to drop the support of versions below 0.90.0, we can consider to remove the reflection part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1178425468

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9ccb31920dd2de1f7c4aedce50fd1e194fc5ac80",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9ccb31920dd2de1f7c4aedce50fd1e194fc5ac80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9ccb31920dd2de1f7c4aedce50fd1e194fc5ac80 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JackWangCS commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
JackWangCS commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1186667143

   Hi @gaborgsomogyi , I found some issues with the KerberosDelegationTokenManager when I am testing the HBaseDelegationTokenProvider. The tokens obtained by KerberosDelegationTokenManager could not be renewed and caused the application fail to submit.
   You can find more logs from: https://gist.github.com/JackWangCS/0b1ec2c1137c686ab874124569063234.
   
   I already test the HBaseDelegationTokenProvider to obtain HBase delegation token, but need more time to test the renew part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r928147951


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();

Review Comment:
   In the meantime I was thinking. It's clear that it's not possible to renew the token which is fine. On the other hand if we give back `Optional.empty()` here then Flink will never ever re-obtain HBase token when it's the only active token provider. I think it would be good to give back expire date field if it's filled since we're re-obtaining the token and not renewing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917747372


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)

Review Comment:
   This is not needed to be changed now so just asking. Why do we need reflection here?
   `HBaseConfiguration.create` is added in version 0.90.0 which is ancient version since the latest is 3.1.3.
   All in all if we support versions where the API must be there then we can drop the reflection in another PR, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917739670


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {

Review Comment:
   OK, then we can keep it like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #20206:
URL: https://github.com/apache/flink/pull/20206#discussion_r917761214


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public void init(Configuration configuration) throws Exception {
+        hbaseConf = getHBaseConfiguration(configuration);
+    }
+
+    private org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration conf) {
+        org.apache.hadoop.conf.Configuration hbaseConf = null;
+        try {
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    HadoopUtils.getHadoopConfiguration(conf);
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            hbaseConf =
+                    (org.apache.hadoop.conf.Configuration)
+                            Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                                    .getMethod("create", org.apache.hadoop.conf.Configuration.class)
+                                    .invoke(null, hadoopConf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return hbaseConf;
+    }
+
+    @Override
+    public boolean delegationTokensRequired() throws Exception {
+        try {
+            if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) {
+                return false;
+            }
+        } catch (IOException e) {
+            LOG.debug("Hadoop Kerberos is not enabled.");
+            return false;
+        }
+        return Objects.nonNull(hbaseConf)
+                && hbaseConf.get("hbase.security.authentication").equals("kerberos");
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception {
+        Token<?> token;
+        try {
+            Preconditions.checkNotNull(hbaseConf);
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");
+            } else {
+                credentials.addToken(token.getService(), token);
+                LOG.info("Added HBase Kerberos security token to credentials.");
+            }
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | InvocationTargetException
+                | IOException e) {
+            LOG.info(
+                    "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+
+        // HBase does not support to renew the delegation token currently
+        // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+        return Optional.empty();

Review Comment:
   I've read this documentation and it's fine that HBase is not renewing token but AFAIU HBase can tell when new token is needed to be obtained from Flink, right? To be more specific if expire date is filled properly then the provider can vote when re-obtain must happen.
   ![image](https://user-images.githubusercontent.com/18561820/178240748-64831081-67c9-4a61-baf4-2132552c7d29.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #20206:
URL: https://github.com/apache/flink/pull/20206#issuecomment-1178844673

   cc @mbalassi @gyfora I'm intended to take care the review/testing part and only later would like to ask for help in the merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org