You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2021/02/01 21:10:30 UTC
[hbase-connectors] branch master updated: HBASE-25326 Allow running
and building with Apache Spark 3.0 (#75)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 4c46a24 HBASE-25326 Allow running and building with Apache Spark 3.0 (#75)
4c46a24 is described below
commit 4c46a24ab3a12be648bf7ed30727e82416327ec8
Author: Luca Canali <lu...@cern.ch>
AuthorDate: Mon Feb 1 22:10:21 2021 +0100
HBASE-25326 Allow running and building with Apache Spark 3.0 (#75)
Signed-off-by: Wellington Chevreuil <we...@gmail.com>
Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Michael Stack <st...@apache.org>
Signed-off-by: Mate Szalay-Beko <sy...@apache.com>
---
spark/README.md | 9 ++++++++-
.../main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala | 8 +++++---
.../scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala | 2 +-
3 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/spark/README.md b/spark/README.md
index f0199e2..a3d823c 100755
--- a/spark/README.md
+++ b/spark/README.md
@@ -26,4 +26,11 @@ To generate an artifact for a different [spark version](https://mvnrepository.co
$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
```
-See above linked spark version to match spark version and supported scala version.
+---
+To build the connector with Spark 3.0, compile it with scala 2.12.
+Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
+Example:
+
+```
+$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
+```
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index 80c415c..db5cda0 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
@@ -436,7 +435,10 @@ class HBaseContext(@transient val sc: SparkContext,
classOf[IdentityTableMapper], null, null, job)
val jconf = new JobConf(job.getConfiguration)
- SparkHadoopUtil.get.addCredentials(jconf)
+ val jobCreds = jconf.getCredentials()
+ UserGroupInformation.setConfiguration(sc.hadoopConfiguration)
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+
new NewHBaseRDD(sc,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
@@ -484,7 +486,7 @@ class HBaseContext(@transient val sc: SparkContext,
Configuration = {
if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
- val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+ val fs = FileSystem.newInstance(sc.hadoopConfiguration)
val inputStream = fs.open(new Path(tmpHdfsConfgFile))
tmpHdfsConfiguration = new Configuration(false)
tmpHdfsConfiguration.readFields(inputStream)
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 1e50585..6b96bcc 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -56,7 +56,7 @@ object Utils {
case DoubleType => Bytes.toDouble(src, offset)
case DateType => new Date(Bytes.toLong(src, offset))
case TimestampType => new Timestamp(Bytes.toLong(src, offset))
- case StringType => UTF8String.fromBytes(src, offset, length)
+ case StringType => Bytes.toString(src, offset, length)
case BinaryType =>
val newArray = new Array[Byte](length)
System.arraycopy(src, offset, newArray, 0, length)