You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by me...@apache.org on 2022/11/29 12:29:57 UTC
[hbase-connectors] branch master updated: HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)
This is an automated email from the ASF dual-hosted git repository.
meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 707e1c5 HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)
707e1c5 is described below
commit 707e1c584a5e1653fb9bfb24d7429c761982adf3
Author: richardantal <an...@gmail.com>
AuthorDate: Tue Nov 29 13:29:52 2022 +0100
HBASE-27397 Spark-hbase support for 'startWith' predicate (#105)
Signed-off-by: Balazs Meszaros <me...@apache.org>
---
.../apache/hadoop/hbase/spark/DefaultSource.scala | 12 +++
.../hbase/spark/DynamicLogicExpression.scala | 21 +++++
.../spark/datasources/HBaseTableScanRDD.scala | 4 +
.../hadoop/hbase/spark/datasources/Utils.scala | 32 +++++++
.../hadoop/hbase/spark/DefaultSourceSuite.scala | 50 ++++++++++
.../hadoop/hbase/spark/StartsWithSuite.scala | 102 +++++++++++++++++++++
6 files changed, 221 insertions(+)
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 84e9123..0da7373 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -538,6 +538,18 @@ case class HBaseRelation (
valueArray += byteValue
}
new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
+ case StringStartsWith(attr, value) =>
+ val field = catalog.getField(attr)
+ if (field != null) {
+ if (field.isRowKey) {
+ val p = Utils.toBytes(value, field)
+ val endRange = Utils.incrementByteArray(p)
+ parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null, new ScanRange(endRange, false, p, true)))
+ }
+ val byteValue = Utils.toBytes(value, field)
+ valueArray += byteValue
+ }
+ new StartsWithLogicExpression(attr, valueArray.length - 1)
case Or(left, right) =>
val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
val rightSideRowKeyFilter = new RowKeyFilter
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
index 4c35a7b..dab311e 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
@@ -130,6 +130,24 @@ class EqualLogicExpression (val columnName:String,
}
}
+@InterfaceAudience.Private
+class StartsWithLogicExpression (val columnName:String,
+ val valueFromQueryIndex:Int) extends DynamicLogicExpression{
+ override def execute(columnToCurrentRowValueMap:
+ util.HashMap[String, ByteArrayComparable],
+ valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+ val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+ val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+
+ currentRowValue != null && valueFromQuery != null && currentRowValue.length >= valueFromQuery.length &&
+ Bytes.equals(valueFromQuery,0, valueFromQuery.length, currentRowValue.bytes,
+ currentRowValue.offset, valueFromQuery.length)
+ }
+ override def appendToExpression(strBuilder: StringBuilder): Unit = {
+ strBuilder.append(columnName + " startsWith " + valueFromQueryIndex)
+ }
+}
+
@InterfaceAudience.Private
class IsNullLogicExpression (val columnName:String,
val isNot:Boolean) extends DynamicLogicExpression{
@@ -242,6 +260,9 @@ object DynamicLogicExpressionBuilder {
} else if (command.equals("!=")) {
(new EqualLogicExpression(expressionArray(offSet),
expressionArray(offSet + 2).toInt, true), offSet + 3)
+ } else if (command.equals("startsWith")) {
+ (new StartsWithLogicExpression(expressionArray(offSet),
+ expressionArray(offSet + 2).toInt), offSet + 3)
} else if (command.equals("isNull")) {
(new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
} else if (command.equals("isNotNull")) {
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index 346983c..fe325f7 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -87,6 +87,10 @@ class HBaseTableScanRDD(relation: HBaseRelation,
None
}
}.toArray
+ if (log.isDebugEnabled) {
+ logDebug(s"Partitions: ${ps.size}");
+ ps.foreach(x => logDebug(x.toString))
+ }
regions.release()
ShutdownHookManager.affixShutdownHook( new Thread() {
override def run() {
diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
index 6b96bcc..05d80d4 100644
--- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
+++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala
@@ -89,4 +89,36 @@ object Utils {
}
}
}
+
+ // increment Byte array's value by 1
+ def incrementByteArray(array: Array[Byte]): Array[Byte] = {
+ if (array.length == 0) {
+ return null
+ }
+ var index = -1 // index of the byte we have to increment
+ var a = array.length - 1
+
+ while (a >= 0) {
+ if (array(a) != (-1).toByte) {
+ index = a
+ a = -1 // break from the loop because we found a non -1 element
+ }
+ a = a - 1
+ }
+
+ if (index < 0) {
+ return null
+ }
+ val returnArray = new Array[Byte](array.length)
+
+ for (a <- 0 until index) {
+ returnArray(a) = array(a)
+ }
+ returnArray(index) = (array(index) + 1).toByte
+ for (a <- index + 1 until array.length) {
+ returnArray(a) = 0.toByte
+ }
+
+ returnArray
+ }
}
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 72a84cf..366c9ba 100644
--- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -954,6 +954,56 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(s.count() == 6)
}
+ test("filtered query01") {
+ val sql = sqlContext
+ import sql.implicits._
+ val df = withCatalog(writeCatalog)
+ val s = df.filter(col("col0").startsWith("row00"))
+ .select("col0", "col1")
+ s.show()
+ assert(s.count() == 10)
+ }
+
+ test("startsWith filtered query 1") {
+ val sql = sqlContext
+ import sql.implicits._
+ val df = withCatalog(writeCatalog)
+ val s = df.filter(col("col0").startsWith("row005"))
+ .select("col0", "col1")
+ s.show()
+ assert(s.count() == 1)
+ }
+
+ test("startsWith filtered query 2") {
+ val sql = sqlContext
+ import sql.implicits._
+ val df = withCatalog(writeCatalog)
+ val s = df.filter(col("col0").startsWith("row"))
+ .select("col0", "col1")
+ s.show()
+ assert(s.count() == 256)
+ }
+
+ test("startsWith filtered query 3") {
+ val sql = sqlContext
+ import sql.implicits._
+ val df = withCatalog(writeCatalog)
+ val s = df.filter(col("col0").startsWith("row19"))
+ .select("col0", "col1")
+ s.show()
+ assert(s.count() == 10)
+ }
+
+ test("startsWith filtered query 4") {
+ val sql = sqlContext
+ import sql.implicits._
+ val df = withCatalog(writeCatalog)
+ val s = df.filter(col("col0").startsWith(""))
+ .select("col0", "col1")
+ s.show()
+ assert(s.count() == 256)
+ }
+
test("Timestamp semantics") {
val sql = sqlContext
import sql.implicits._
diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala
new file mode 100644
index 0000000..2dde0ae
--- /dev/null
+++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/StartsWithSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+
+import org.apache.hadoop.hbase.spark.datasources.Utils
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class StartsWithSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+
+ test("simple1") {
+ val t = new Array[Byte](2)
+ t(0) = 1.toByte
+ t(1) = 2.toByte
+
+ val expected = new Array[Byte](2)
+ expected(0) = 1.toByte
+ expected(1) = 3.toByte
+
+ val res = Utils.incrementByteArray(t)
+ assert(res.sameElements(expected))
+ }
+
+ test("simple2") {
+ val t = new Array[Byte](1)
+ t(0) = 87.toByte
+
+ val expected = new Array[Byte](1)
+ expected(0) = 88.toByte
+
+ val res = Utils.incrementByteArray(t)
+ assert(res.sameElements(expected))
+ }
+
+ test("overflow1") {
+ val t = new Array[Byte](2)
+ t(0) = 1.toByte
+ t(1) = (-1).toByte
+
+ val expected = new Array[Byte](2)
+ expected(0) = 2.toByte
+ expected(1) = 0.toByte
+
+ val res = Utils.incrementByteArray(t)
+
+ assert(res.sameElements(expected))
+ }
+
+ test("overflow2") {
+ val t = new Array[Byte](2)
+ t(0) = (-1).toByte
+ t(1) = (-1).toByte
+
+ val expected = null
+
+ val res = Utils.incrementByteArray(t)
+
+ assert(res == expected)
+ }
+
+ test("max-min-value") {
+ val t = new Array[Byte](2)
+ t(0) = 1.toByte
+ t(1) = (127).toByte
+
+ val expected = new Array[Byte](2)
+ expected(0) = 1.toByte
+ expected(1) = (-128).toByte
+
+ val res = Utils.incrementByteArray(t)
+ assert(res.sameElements(expected))
+ }
+
+ test("complicated") {
+ val imput = "row005"
+ val expectedOutput = "row006"
+
+ val t = Bytes.toBytes(imput)
+ val expected = Bytes.toBytes(expectedOutput)
+
+ val res = Utils.incrementByteArray(t)
+ assert(res.sameElements(expected))
+ }
+
+}