You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/28 18:08:34 UTC

[spark] branch master updated: [SPARK-26432][CORE] Obtain HBase delegation token operation compatible with HBase 2.x.x version API

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dfed439  [SPARK-26432][CORE] Obtain HBase delegation token operation compatible with HBase 2.x.x version API
dfed439 is described below

commit dfed439e33b7bf224dd412b0960402068d961c7b
Author: s71955 <su...@gmail.com>
AuthorDate: Mon Jan 28 10:08:23 2019 -0800

    [SPARK-26432][CORE] Obtain HBase delegation token operation compatible with HBase 2.x.x version API
    
    ## What changes were proposed in this pull request?
    
    While obtaining token from hbase service , spark uses deprecated API of hbase ,
    ```public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf)```
    This deprecated API is already been removed from hbase 2.x version as part of the hbase 2.x major release. https://issues.apache.org/jira/browse/HBASE-14713_
    there is one more stable API in
    ```public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)``` in TokenUtil class
    spark shall use this stable api for getting the delegation token.
    
    To invoke this api first connection object has to be retrieved from ConnectionFactory and the same connection can be passed to obtainToken(Connection conn) for getting token.
    eg: Call   ```public static Connection createConnection(Configuration conf)```
    , then call   ```public static Token<AuthenticationTokenIdentifier> obtainToken( Connection conn)```.
    
    ## How was this patch tested?
    Manual testing is been done.
    Manual test result:
    Before fix:
    
    ![hbase-dep-obtaintok 1](https://user-images.githubusercontent.com/12999161/50699264-64cac200-106d-11e9-81b4-e50ae8097f27.png)
    
    After fix:
    1. Create 2 tables in hbase shell
     >Launch hbase shell
     >Enter commands to create tables and load data
        create 'table1','cf'
        put 'table1','row1','cf:cid','20'
    
        create 'table2','cf'
        put 'table2','row1','cf:cid','30'
    
     >Show values command
       get 'table1','row1','cf:cid'  will diplay value as 20
       get 'table2','row1','cf:cid'  will diplay value as 30
    
    2.Run SparkHbasetoHbase class in testSpark.jar using spark-submit
    
    spark-submit --master yarn-cluster --class com.mrs.example.spark.SparkHbasetoHbase --conf "spark.yarn.security.credentials.hbase.enabled"="true" --conf "spark.security.credentials.hbase.enabled"="true" --keytab /opt/client/user.keytab --principal sen testSpark.jar
    
    The SparkHbasetoHbase test class will update the value of table2 with sum of values of table1 & table2.
    
    table2 = table1+table2
    As we can see in the snapshot the spark job has been successfully able to interact with hbase service and able to update the row count.
    ![obtaintok_success 1](https://user-images.githubusercontent.com/12999161/50699393-bd9a5a80-106d-11e9-96c6-6c250d561efa.png)
    
    Closes #23429 from sujith71955/master_hbase_service.
    
    Authored-by: s71955 <su...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../security/HBaseDelegationTokenProvider.scala    | 56 ++++++++++++++++++++--
 1 file changed, 52 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index 3bf8c14..e345b0b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.security
 
+import java.io.Closeable
+
 import scala.reflect.runtime.universe
 import scala.util.control.NonFatal
 
@@ -42,8 +44,8 @@ private[security] class HBaseDelegationTokenProvider
     try {
       val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
       val obtainToken = mirror.classLoader.
-        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
-        getMethod("obtainToken", classOf[Configuration])
+        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil")
+        .getMethod("obtainToken", classOf[Configuration])
 
       logDebug("Attempting to fetch HBase security token.")
       val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
@@ -52,12 +54,58 @@ private[security] class HBaseDelegationTokenProvider
       creds.addToken(token.getService, token)
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to get token from service $serviceName", e)
+        logWarning(s"Failed to get token from service $serviceName due to  " + e +
+          s" Retrying to fetch HBase security token with hbase connection parameter.")
+        // Seems to be spark is trying to get the token from HBase 2.x.x  version or above where the
+        // obtainToken(Configuration conf) API has been removed. Lets try obtaining the token from
+        // another compatible API of HBase service.
+        obtainDelegationTokensWithHBaseConn(hadoopConf, creds)
     }
-
     None
   }
 
+  /**
+   * Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf) is a deprecated
+   * method and in Hbase 2.0.0 the method is already removed.
+   * The HBase client API used in below method is introduced from HBase 0.98.9 version,
+   * to invoke this api first connection object has to be retrieved from ConnectionFactory and the
+   * same connection can be passed to
+   * Token<AuthenticationTokenIdentifier> obtainToken(Connection conn) API
+   *
+   * @param hadoopConf Configuration of current Hadoop Compatible system.
+   * @param creds Credentials to add tokens and security keys to.
+   */
+  private def obtainDelegationTokensWithHBaseConn(
+      hadoopConf: Configuration,
+      creds: Credentials): Unit = {
+    var hbaseConnection : Closeable = null
+    try {
+      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+      val connectionFactoryClass = mirror.classLoader
+        .loadClass("org.apache.hadoop.hbase.client.ConnectionFactory")
+        .getMethod("createConnection", classOf[Configuration])
+      hbaseConnection = connectionFactoryClass.invoke(null, hbaseConf(hadoopConf))
+        .asInstanceOf[Closeable]
+      val connectionParamTypeClassRef = mirror.classLoader
+        .loadClass("org.apache.hadoop.hbase.client.Connection")
+      val obtainTokenMethod = mirror.classLoader
+        .loadClass("org.apache.hadoop.hbase.security.token.TokenUtil")
+        .getMethod("obtainToken", connectionParamTypeClassRef)
+      logDebug("Attempting to fetch HBase security token.")
+      val token = obtainTokenMethod.invoke(null, hbaseConnection)
+        .asInstanceOf[Token[_ <: TokenIdentifier]]
+      logInfo(s"Get token from HBase: ${token.toString}")
+      creds.addToken(token.getService, token)
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to get token from service $serviceName", e)
+    } finally {
+      if (null != hbaseConnection) {
+        hbaseConnection.close()
+      }
+    }
+  }
+
   override def delegationTokensRequired(
       sparkConf: SparkConf,
       hadoopConf: Configuration): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org