You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ta...@apache.org on 2022/01/13 19:08:45 UTC

[hbase-connectors] branch master updated: [HBASE-26534] Update HBase version to 2.4, make Hadoop 3 and Spark 3 default versions. (#88)

This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 7578a33  [HBASE-26534] Update HBase version to 2.4, make Hadoop 3 and Spark 3 default versions. (#88)
7578a33 is described below

commit 7578a3316d70101ad02048544deb24ef5a4cd060
Author: Luca Canali <lu...@cern.ch>
AuthorDate: Thu Jan 13 20:08:41 2022 +0100

    [HBASE-26534] Update HBase version to 2.4, make Hadoop 3 and Spark 3 default versions. (#88)
    
    Signed-off-by: Tak Lon (Stephen) Wu <ta...@apache.org>
    Reviewed-by:  Ian A Wilson
---
 .../hadoop/hbase/kafka/KafkaTableForBridge.java    |  6 +++++
 pom.xml                                            | 18 +++++++--------
 spark/README.md                                    | 26 +++++++++++++---------
 spark/hbase-spark/pom.xml                          | 20 ++++++++---------
 .../apache/hadoop/hbase/spark/HBaseContext.scala   | 19 +++++++++++-----
 .../hadoop/hbase/spark/TestJavaHBaseContext.java   | 10 ++++-----
 .../apache/hadoop/hbase/spark/BulkLoadSuite.scala  |  8 +++----
 spark/pom.xml                                      |  9 ++++----
 8 files changed, 66 insertions(+), 50 deletions(-)

diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
index 25e4796..31baa44 100755
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -69,6 +70,11 @@ public class KafkaTableForBridge implements Table {
     List<String> topics = new ArrayList<>();
   }
 
+  @Override
+  public RegionLocator getRegionLocator() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   public KafkaTableForBridge(TableName tableName,
                  Configuration conf,
                  TopicRoutingRules routingRules,
diff --git a/pom.xml b/pom.xml
index 4d70408..034bff2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,20 +129,20 @@
     <compileSource>1.8</compileSource>
     <java.min.version>${compileSource}</java.min.version>
     <maven.min.version>3.5.0</maven.min.version>
-    <hbase.version>2.2.2</hbase.version>
+    <hbase.version>2.4.9</hbase.version>
     <exec.maven.version>1.6.0</exec.maven.version>
     <audience-annotations.version>0.5.0</audience-annotations.version>
     <junit.version>4.12</junit.version>
-    <hbase-thirdparty.version>2.2.1</hbase-thirdparty.version>
+    <hbase-thirdparty.version>4.0.1</hbase-thirdparty.version>
     <hadoop-two.version>2.8.5</hadoop-two.version>
-    <hadoop-three.version>3.0.3</hadoop-three.version>
-    <hadoop.version>${hadoop-two.version}</hadoop.version>
+    <hadoop-three.version>3.2.0</hadoop-three.version>
+    <hadoop.version>${hadoop-three.version}</hadoop.version>
     <slf4j.version>1.7.25</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <checkstyle.version>8.18</checkstyle.version>
-    <maven.checkstyle.version>3.1.0</maven.checkstyle.version>
-    <surefire.version>3.0.0-M4</surefire.version>
-    <enforcer.version>3.0.0-M3</enforcer.version>
+    <checkstyle.version>8.28</checkstyle.version>
+    <maven.checkstyle.version>3.1.2</maven.checkstyle.version>
+    <surefire.version>3.0.0-M5</surefire.version>
+    <enforcer.version>3.0.0</enforcer.version>
     <extra.enforcer.version>1.2</extra.enforcer.version>
     <restrict-imports.enforcer.version>0.14.0</restrict-imports.enforcer.version>
     <!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
@@ -153,7 +153,7 @@
     <commons-lang3.version>3.6</commons-lang3.version>
     <!--This property is for hadoops netty. HBase netty
          comes in via hbase-thirdparty hbase-shaded-netty-->
-    <netty.hadoop.version>3.6.2.Final</netty.hadoop.version>
+    <netty.hadoop.version>3.10.6.Final</netty.hadoop.version>
     <os.maven.version>1.6.1</os.maven.version>
     <glassfish.el.version>3.0.1-b08</glassfish.el.version>
     <compat.module>hbase-hadoop2-compat</compat.module>
diff --git a/spark/README.md b/spark/README.md
index a3d823c..2dd90eb 100755
--- a/spark/README.md
+++ b/spark/README.md
@@ -18,19 +18,25 @@ limitations under the License.
 
 # Apache HBase&trade; Spark Connector
 
-## Scala and Spark Versions
+## Spark, Scala and Configurable Options
 
-To generate an artifact for a different [spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [scala version](https://www.scala-lang.org/download/all.html), pass command-line options as follows (changing version numbers appropriately):
+To generate an artifact for a different [Spark version](https://mvnrepository.com/artifact/org.apache.spark/spark-core) and/or [Scala version](https://www.scala-lang.org/download/all.html),
+[Hadoop version](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core), or [HBase version](https://mvnrepository.com/artifact/org.apache.hbase/hbase), pass command-line options as follows (changing version numbers appropriately):
 
 ```
-$ mvn -Dspark.version=2.2.2 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
+$ mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.8 clean install
 ```
 
----
-To build the connector with Spark 3.0, compile it with scala 2.12.
-Additional configurations that you can customize are the Spark version, HBase version, and Hadoop version.
-Example:
+Note: to build the connector with Spark 2.x, compile it with `-Dscala.binary.version=2.11` and use the profile `-Dhadoop.profile=2.0`
+
+## Configuration and Installation
+**Client-side** (Spark) configuration:
+- The HBase configuration file `hbase-site.xml` should be made available to Spark, it can be copied to `$SPARK_CONF_DIR` (default is $SPARK_HOME/conf`)
+
+**Server-side** (HBase region servers) configuration:
+- The following jars need to be in the CLASSPATH of the HBase region servers:
+  - scala-library, hbase-spark, and hbase-spark-protocol-shaded.
+- The server-side configuration is needed for column filter pushdown
+  - if you cannot perform the server-side configuration, consider using `.option("hbase.spark.pushdown.columnfilter", false)`
+- The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector.
 
-```
-$ mvn -Dspark.version=3.0.1 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.2.4 -Dhadoop.profile=3.0 -Dhadoop-three.version=3.2.0 -DskipTests clean package
-```
diff --git a/spark/hbase-spark/pom.xml b/spark/hbase-spark/pom.xml
index 379b5db..26c5c70 100644
--- a/spark/hbase-spark/pom.xml
+++ b/spark/hbase-spark/pom.xml
@@ -274,14 +274,16 @@
         <skipTests>true</skipTests>
       </properties>
     </profile>
-    <!-- profile against Hadoop 2.x: This is the default. -->
+    <!--
+      profile for building against Hadoop 2.x. Activate using:
+       mvn -Dhadoop.profile=2.0
+    -->
     <profile>
       <id>hadoop-2.0</id>
       <activation>
         <property>
-          <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
-          <!--h2-->
-          <name>!hadoop.profile</name>
+          <name>hadoop.profile</name>
+          <value>2.0</value>
         </property>
       </activation>
       <dependencies>
@@ -345,20 +347,16 @@
         </dependency>
       </dependencies>
     </profile>
-    <!--
-      profile for building against Hadoop 3.0.x. Activate using:
-       mvn -Dhadoop.profile=3.0
-    -->
+    <!-- profile against Hadoop 3.x: This is the default. -->
     <profile>
       <id>hadoop-3.0</id>
       <activation>
         <property>
-          <name>hadoop.profile</name>
-          <value>3.0</value>
+          <name>!hadoop.profile</name>
         </property>
       </activation>
       <properties>
-        <hadoop.version>3.0</hadoop.version>
+        <hadoop.version>${hadoop-three.version}</hadoop.version>
       </properties>
       <dependencies>
         <dependency>
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index db5cda0..ee860c4 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.io.compress.Compression
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
-import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
-import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.regionserver.{HStoreFile, StoreFileWriter, StoreUtils, BloomType}
+import org.apache.hadoop.hbase.util.{Bytes, ChecksumType}
 import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -900,10 +900,19 @@ class HBaseContext(@transient val sc: SparkContext,
 
     val tempConf = new Configuration(conf)
     tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+
+    // HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils
+    // so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it
+    // compatible between hbase 2.3.x and 2.4.x
     val contextBuilder = new HFileContextBuilder()
       .withCompression(Algorithm.valueOf(familyOptions.compression))
-      .withChecksumType(HStore.getChecksumType(conf))
-      .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+      // ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed.
+      .withChecksumType(ChecksumType
+        .nameToType(conf.get(HConstants.CHECKSUM_TYPE_NAME,
+          ChecksumType.getDefaultChecksumType.getName)))
+      .withCellComparator(CellComparator.getInstance())
+      .withBytesPerCheckSum(conf.getInt(HConstants.BYTES_PER_CHECKSUM,
+        HFile.DEFAULT_BYTES_PER_CHECKSUM))
       .withBlockSize(familyOptions.blockSize)
 
     if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
@@ -919,7 +928,7 @@ class HBaseContext(@transient val sc: SparkContext,
     new WriterLength(0,
       new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
         .withBloomType(BloomType.valueOf(familyOptions.bloomType))
-        .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
+        .withFileContext(hFileContext)
         .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
         .withFavoredNodes(favoredNodes).build())
 
diff --git a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index 61ada1d..793ed8e 100644
--- a/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/spark/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -95,8 +95,7 @@ public class TestJavaHBaseContext implements Serializable {
 
     LOG.info("starting minicluster");
 
-    TEST_UTIL.startMiniZKCluster();
-    TEST_UTIL.startMiniHBaseCluster(1, 1);
+    TEST_UTIL.startMiniCluster();
 
     LOG.info(" - minicluster started");
   }
@@ -104,8 +103,7 @@ public class TestJavaHBaseContext implements Serializable {
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     LOG.info("shuting down minicluster");
-    TEST_UTIL.shutdownMiniHBaseCluster();
-    TEST_UTIL.shutdownMiniZKCluster();
+    TEST_UTIL.shutdownMiniCluster();
     LOG.info(" - minicluster shut down");
     TEST_UTIL.cleanupTestDir();
 
@@ -284,8 +282,8 @@ public class TestJavaHBaseContext implements Serializable {
 
     final JavaRDD<String> stringJavaRDD =
             HBASE_CONTEXT.bulkGet(TableName.valueOf(tableName), 2, rdd,
-            new GetFunction(),
-            new ResultFunction());
+              new GetFunction(),
+              new ResultFunction());
 
     Assert.assertEquals(stringJavaRDD.count(), 5);
   }
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index dc328f3..59435bb 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -391,7 +391,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     for ( i <- 0 until f1FileList.length) {
       val reader = HFile.createReader(fs, f1FileList(i).getPath,
         new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+      assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
       assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
     }
 
@@ -401,7 +401,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     for ( i <- 0 until f2FileList.length) {
       val reader = HFile.createReader(fs, f2FileList(i).getPath,
         new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("none"))
+      assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
       assert(reader.getDataBlockEncoding.name().equals("NONE"))
     }
 
@@ -870,7 +870,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     for ( i <- 0 until f1FileList.length) {
       val reader = HFile.createReader(fs, f1FileList(i).getPath,
         new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+      assert(reader.getTrailer.getCompressionCodec().getName.equals("gz"))
       assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
     }
 
@@ -880,7 +880,7 @@ BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
     for ( i <- 0 until f2FileList.length) {
       val reader = HFile.createReader(fs, f2FileList(i).getPath,
         new CacheConfig(config), true, config)
-      assert(reader.getCompressionAlgorithm.getName.equals("none"))
+      assert(reader.getTrailer.getCompressionCodec().getName.equals("none"))
       assert(reader.getDataBlockEncoding.name().equals("NONE"))
     }
 
diff --git a/spark/pom.xml b/spark/pom.xml
index c65670c..050d35a 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -44,13 +44,12 @@
 
   <properties>
     <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
-    <hbase-thirdparty.version>2.1.0</hbase-thirdparty.version>
-    <jackson.version>2.9.10</jackson.version>
-    <spark.version>2.4.0</spark.version>
+    <jackson.version>2.12.5</jackson.version>
+    <spark.version>3.1.2</spark.version>
     <!-- The following version is in sync with Spark's choice
          Please take caution when this version is modified -->
-    <scala.version>2.11.12</scala.version>
-    <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.12.10</scala.version>
+    <scala.binary.version>2.12</scala.binary.version>
   </properties>
 
   <dependencyManagement>