You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by me...@apache.org on 2022/11/29 12:29:57 UTC

[hbase-connectors] branch master updated: HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)

This is an automated email from the ASF dual-hosted git repository.

meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 707e1c5  HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)
707e1c5 is described below

commit 707e1c584a5e1653fb9bfb24d7429c761982adf3
Author: richardantal <an...@gmail.com>
AuthorDate: Tue Nov 29 13:29:52 2022 +0100

    HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)
    
    Signed-off-by: Balazs Meszaros <me...@apache.org>
---
 .../apache/hadoop/hbase/spark/DefaultSource.scala  |  12 +++
 .../hbase/spark/DynamicLogicExpression.scala       |  21 +++++
 .../spark/datasources/HBaseTableScanRDD.scala      |   4 +
 .../hadoop/hbase/spark/datasources/Utils.scala     |  32 +++++++
 .../hadoop/hbase/spark/DefaultSourceSuite.scala    |  50 ++++++++++
 .../hadoop/hbase/spark/StartsWithSuite.scala       | 102 +++++++++++++++++++++
 6 files changed, 221 insertions(+)

diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 84e9123..0da7373 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -538,6 +538,18 @@ case class HBaseRelation (
           valueArray += byteValue
         }
         new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
+      case StringStartsWith(attr, value) =>
+        val field = catalog.getField(attr)
+        if (field != null) {
+          if (field.isRowKey) {
+            val p = Utils.toBytes(value, field)
+            val endRange = Utils.incrementByteArray(p)
+            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null, new ScanRange(endRange, false, p, true)))
+          }
+          val byteValue = Utils.toBytes(value, field)
+          valueArray += byteValue
+        }
+        new StartsWithLogicExpression(attr, valueArray.length - 1)
       case Or(left, right) =>
         val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
         val rightSideRowKeyFilter = new RowKeyFilter
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
index 4c35a7b..dab311e 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
@@ -130,6 +130,24 @@ class EqualLogicExpression (val columnName:String,
   }
 }
 
+@InterfaceAudience.Private
+class StartsWithLogicExpression (val columnName:String,
+                            val valueFromQueryIndex:Int) extends DynamicLogicExpression{
+  override def execute(columnToCurrentRowValueMap:
+                       util.HashMap[String, ByteArrayComparable],
+                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+    currentRowValue != null && valueFromQuery != null && currentRowValue.length >= valueFromQuery.length &&
+      Bytes.equals(valueFromQuery,0, valueFromQuery.length, currentRowValue.bytes,
+        currentRowValue.offset, valueFromQuery.length)
+  }
+  override def appendToExpression(strBuilder: StringBuilder): Unit = {
+    strBuilder.append(columnName + " startsWith " + valueFromQueryIndex)
+  }
+}
+
 @InterfaceAudience.Private
 class IsNullLogicExpression (val columnName:String,
                              val isNot:Boolean) extends DynamicLogicExpression{
@@ -242,6 +260,9 @@ object DynamicLogicExpressionBuilder {
         } else if (command.equals("!=")) {
           (new EqualLogicExpression(expressionArray(offSet),
             expressionArray(offSet + 2).toInt, true), offSet + 3)
+        } else if (command.equals("startsWith")) {
+          (new StartsWithLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt), offSet + 3)
         } else if (command.equals("isNull")) {
           (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
         } else if (command.equals("isNotNull")) {
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 346983c..fe325f7 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -87,6 +87,10 @@ class HBaseTableScanRDD(relation: HBaseRelation,
         None
       }
     }.toArray
+    if (log.isDebugEnabled) {
+      logDebug(s"Partitions: ${ps.size}");
+      ps.foreach(x => logDebug(x.toString))
+    }
     regions.release()
     ShutdownHookManager.affixShutdownHook( new Thread() {
       override def run() {
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 6b96bcc..05d80d4 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -89,4 +89,36 @@ object Utils {
       }
     }
   }
+
+  // increment Byte array's value by 1
+  def incrementByteArray(array: Array[Byte]): Array[Byte] = {
+    if (array.length == 0) {
+      return null
+    }
+    var index = -1 // index of the byte we have to increment
+    var a = array.length - 1
+
+    while (a >= 0) {
+      if (array(a) != (-1).toByte) {
+        index = a
+        a = -1 // break from the loop because we found a non -1 element
+      }
+      a = a - 1
+    }
+
+    if (index < 0) {
+      return null
+    }
+    val returnArray = new Array[Byte](array.length)
+
+    for (a <- 0 until index) {
+      returnArray(a) = array(a)
+    }
+    returnArray(index) = (array(index) + 1).toByte
+    for (a <- index + 1 until array.length) {
+      returnArray(a) = 0.toByte
+    }
+
+    returnArray
+  }
 }
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 72a84cf..366c9ba 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -954,6 +954,56 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     assert(s.count() == 6)
   }
 
+  test("filtered query01") {
+    val sql = sqlContext
+    import sql.implicits._
+    val df = withCatalog(writeCatalog)
+    val s = df.filter(col("col0").startsWith("row00"))
+      .select("col0", "col1")
+    s.show()
+    assert(s.count() == 10)
+  }
+
+  test("startsWith filtered query 1") {
+    val sql = sqlContext
+    import sql.implicits._
+    val df = withCatalog(writeCatalog)
+    val s = df.filter(col("col0").startsWith("row005"))
+      .select("col0", "col1")
+    s.show()
+    assert(s.count() == 1)
+  }
+
+  test("startsWith filtered query 2") {
+    val sql = sqlContext
+    import sql.implicits._
+    val df = withCatalog(writeCatalog)
+    val s = df.filter(col("col0").startsWith("row"))
+      .select("col0", "col1")
+    s.show()
+    assert(s.count() == 256)
+  }
+
+  test("startsWith filtered query 3") {
+    val sql = sqlContext
+    import sql.implicits._
+    val df = withCatalog(writeCatalog)
+    val s = df.filter(col("col0").startsWith("row19"))
+      .select("col0", "col1")
+    s.show()
+    assert(s.count() == 10)
+  }
+
+  test("startsWith filtered query 4") {
+    val sql = sqlContext
+    import sql.implicits._
+    val df = withCatalog(writeCatalog)
+    val s = df.filter(col("col0").startsWith(""))
+      .select("col0", "col1")
+    s.show()
+    assert(s.count() == 256)
+  }
+
   test("Timestamp semantics") {
     val sql = sqlContext
     import sql.implicits._
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala
new file mode 100644
index 0000000..2dde0ae
--- /dev/null
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+
+import org.apache.hadoop.hbase.spark.datasources.Utils
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class StartsWithSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+
+  test("simple1") {
+    val t = new Array[Byte](2)
+    t(0) = 1.toByte
+    t(1) = 2.toByte
+
+    val expected = new Array[Byte](2)
+    expected(0) = 1.toByte
+    expected(1) = 3.toByte
+
+    val res = Utils.incrementByteArray(t)
+    assert(res.sameElements(expected))
+  }
+
+  test("simple2") {
+    val t = new Array[Byte](1)
+    t(0) = 87.toByte
+
+    val expected = new Array[Byte](1)
+    expected(0) = 88.toByte
+
+    val res = Utils.incrementByteArray(t)
+    assert(res.sameElements(expected))
+  }
+
+  test("overflow1") {
+    val t = new Array[Byte](2)
+    t(0) = 1.toByte
+    t(1) = (-1).toByte
+
+    val expected = new Array[Byte](2)
+    expected(0) = 2.toByte
+    expected(1) = 0.toByte
+
+    val res = Utils.incrementByteArray(t)
+
+    assert(res.sameElements(expected))
+  }
+
+  test("overflow2") {
+    val t = new Array[Byte](2)
+    t(0) = (-1).toByte
+    t(1) = (-1).toByte
+
+    val expected = null
+
+    val res = Utils.incrementByteArray(t)
+
+    assert(res == expected)
+  }
+
+  test("max-min-value") {
+    val t = new Array[Byte](2)
+    t(0) = 1.toByte
+    t(1) = (127).toByte
+
+    val expected = new Array[Byte](2)
+    expected(0) = 1.toByte
+    expected(1) = (-128).toByte
+
+    val res = Utils.incrementByteArray(t)
+    assert(res.sameElements(expected))
+  }
+
+  test("complicated") {
+    val imput = "row005"
+    val expectedOutput = "row006"
+
+    val t = Bytes.toBytes(imput)
+    val expected = Bytes.toBytes(expectedOutput)
+
+    val res = Utils.incrementByteArray(t)
+    assert(res.sameElements(expected))
+  }
+
+}