You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2021/02/01 21:10:30 UTC

[hbase-connectors] branch master updated: HBASE-25326 Allow running and building with Apache Spark 3.0 (#75)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c46a24  HBASE-25326 Allow running and building with Apache Spark 3.0 (#75)
4c46a24 is described below

commit 4c46a24ab3a12be648bf7ed30727e82416327ec8
Author: Luca Canali <lu...@cern.ch>
AuthorDate: Mon Feb 1 22:10:21 2021 +0100

    HBASE-25326 Allow running and building with Apache Spark 3.0 (#75)
    
    Signed-off-by: Wellington Chevreuil <we...@gmail.com>
    Signed-off-by: Josh Elser <el...@apache.org>
    Signed-off-by: Michael Stack <st...@apache.org>
    Signed-off-by: Mate Szalay-Beko <sy...@apache.com>
---
 spark/README.md                                                  | 9 ++++++++-
 .../main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala  | 8 +++++---
 .../scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala  | 2 +-
 3 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/spark/README.md b/spark/README.md
index f0199e2..a3d823c 100755
--- a/spark/README.md
+++ b/spark/README.md
@@ -26,4 +26,11 @@ To generate an artifact for a different [spark version](https://mvnrepository.co
 $ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
 ```
 
-See above linked spark version to match spark version and supported scala version.
+---
+To build the connector with Spark 3.0, compile it with scala 2.12.
+Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
+Example:
+
+```
+$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
+```
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 80c415c..db5cda0 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
@@ -436,7 +435,10 @@ class HBaseContext(@transient val sc: SparkContext,
       classOf[IdentityTableMapper], null, null, job)
 
     val jconf = new JobConf(job.getConfiguration)
-    SparkHadoopUtil.get.addCredentials(jconf)
+    val jobCreds = jconf.getCredentials()
+    UserGroupInformation.setConfiguration(sc.hadoopConfiguration)
+    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+
     new NewHBaseRDD(sc,
       classOf[TableInputFormat],
       classOf[ImmutableBytesWritable],
@@ -484,7 +486,7 @@ class HBaseContext(@transient val sc: SparkContext,
   Configuration = {
 
     if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
-      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+      val fs = FileSystem.newInstance(sc.hadoopConfiguration)
       val inputStream = fs.open(new Path(tmpHdfsConfgFile))
       tmpHdfsConfiguration = new Configuration(false)
       tmpHdfsConfiguration.readFields(inputStream)
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 1e50585..6b96bcc 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -56,7 +56,7 @@ object Utils {
         case DoubleType => Bytes.toDouble(src, offset)
         case DateType => new Date(Bytes.toLong(src, offset))
         case TimestampType => new Timestamp(Bytes.toLong(src, offset))
-        case StringType => UTF8String.fromBytes(src, offset, length)
+        case StringType => Bytes.toString(src, offset, length)
         case BinaryType =>
           val newArray = new Array[Byte](length)
           System.arraycopy(src, offset, newArray, 0, length)