You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/29 00:48:16 UTC
hbase git commit: HBASE-14796 Enhance the Gets in the connector (Zhan
Zhang)
Repository: hbase
Updated Branches:
refs/heads/master 2fba25b66 -> 6868c6366
HBASE-14796 Enhance the Gets in the connector (Zhan Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6868c636
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6868c636
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6868c636
Branch: refs/heads/master
Commit: 6868c6366002d5b4e25980f37ede8839e7a7e92d
Parents: 2fba25b
Author: tedyu <yu...@gmail.com>
Authored: Mon Dec 28 15:48:10 2015 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Mon Dec 28 15:48:10 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/spark/DefaultSource.scala | 33 ++-----
.../hadoop/hbase/spark/datasources/Bound.scala | 24 +++++
.../spark/datasources/HBaseResources.scala | 14 +++
.../spark/datasources/HBaseSparkConf.scala | 2 +
.../spark/datasources/HBaseTableScanRDD.scala | 92 +++++++++++++++++---
5 files changed, 127 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 73cab3c..b6d7982 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -159,6 +159,10 @@ case class HBaseRelation (val tableName:String,
.getOrElse(sqlContext.sparkContext.getConf.getInt(
HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum))
+ val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
+ .getOrElse(sqlContext.sparkContext.getConf.getInt(
+ HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize))
+
//create or get latest HBaseContext
@transient val hbaseContext:HBaseContext = if (useHBaseContext) {
LatestHBaseContextCache.latest
@@ -267,6 +271,7 @@ case class HBaseRelation (val tableName:String,
None
}
val hRdd = new HBaseTableScanRDD(this, pushDownFilterJava, requiredQualifierDefinitionList.seq)
+ pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
var resultRDD: RDD[Row] = {
val tmp = hRdd.map{ r =>
@@ -280,34 +285,6 @@ case class HBaseRelation (val tableName:String,
}
}
- //If there are gets then we can get them from the driver and union that rdd in
- // with the rest of the values.
- if (getList.size() > 0) {
- val connection =
- ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
- try {
- val table = connection.getTable(TableName.valueOf(tableName))
- try {
- val results = table.get(getList)
- val rowList = mutable.MutableList[Row]()
- for (i <- 0 until results.length) {
- val rowArray = requiredColumns.map(c =>
- DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
- rowList += Row.fromSeq(rowArray)
- }
- val getRDD = sqlContext.sparkContext.parallelize(rowList)
- if (resultRDD == null) resultRDD = getRDD
- else {
- resultRDD = resultRDD.union(getRDD)
- }
- } finally {
- table.close()
- }
- } finally {
- connection.close()
- }
- }
-
if (resultRDD == null) {
val scan = new Scan()
scan.setCacheBlocks(blockCacheEnable)
http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
index 0f6098d..8e03e95 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
@@ -87,3 +87,27 @@ object Ranges {
}
}
+object Points {
+ def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = {
+ ps.flatMap { p =>
+ if (ord.compare(r.lower.get.b, p) <= 0) {
+ // if region lower bound is less or equal to the point
+ if (r.upper.isDefined) {
+ // if region upper bound is defined
+ if (ord.compare(r.upper.get.b, p) > 0) {
+ // if the upper bound is greater than the point (because upper bound is exclusive)
+ Some(p)
+ } else {
+ None
+ }
+ } else {
+ // if the region upper bound is not defined (infinity)
+ Some(p)
+ }
+ } else {
+ None
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
index 19a6ea7..14c5fd0 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
@@ -38,6 +38,12 @@ case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource
}
}
+case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource {
+ def release() {
+ tbr.release()
+ }
+}
+
trait ReferencedResource {
var count: Int = 0
def init(): Unit
@@ -100,6 +106,10 @@ case class TableResource(relation: HBaseRelation) extends ReferencedResource {
def getScanner(scan: Scan): ScanResource = releaseOnException {
ScanResource(this, table.getScanner(scan))
}
+
+ def get(list: java.util.List[org.apache.hadoop.hbase.client.Get]) = releaseOnException {
+ GetResource(this, table.get(list))
+ }
}
case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
@@ -138,6 +148,10 @@ object HBaseResources{
sr.rs
}
+ implicit def GetResToResult(gr: GetResource): Array[Result] = {
+ gr.rs
+ }
+
implicit def TableResToTable(tr: TableResource): Table = {
tr.table
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 67580b0..5e11356 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -29,4 +29,6 @@ object HBaseSparkConf{
val defaultCachingSize = 1000
val BATCH_NUM = "spark.hbase.batchNum"
val defaultBatchNum = 1000
+ val BULKGET_SIZE = "spark.hbase.bulkGetSize"
+ val defaultBulkGetSize = 1000
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index eb9d39a..f288c34 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -17,6 +17,8 @@
package org.apache.hadoop.hbase.spark.datasources
+import java.util.ArrayList
+
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter}
import org.apache.hadoop.hbase.spark.hbase._
@@ -32,7 +34,12 @@ class HBaseTableScanRDD(relation: HBaseRelation,
val columns: Seq[SchemaQualifierDefinition] = Seq.empty
)extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging {
private def sparkConf = SparkEnv.get.conf
- var ranges = Seq.empty[Range]
+ @transient var ranges = Seq.empty[Range]
+ @transient var points = Seq.empty[Array[Byte]]
+ def addPoint(p: Array[Byte]) {
+ points :+= p
+ }
+
def addRange(r: ScanRange) = {
val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
@@ -65,12 +72,13 @@ class HBaseTableScanRDD(relation: HBaseRelation,
logDebug(s"There are ${regions.size} regions")
val ps = regions.flatMap { x =>
val rs = Ranges.and(Range(x), ranges)
- if (rs.size > 0) {
+ val ps = Points.and(Range(x), points)
+ if (rs.size > 0 || ps.size > 0) {
if(log.isDebugEnabled) {
rs.foreach(x => logDebug(x.toString))
}
idx += 1
- Some(HBaseScanPartition(idx - 1, x, rs, SerializedFilter.toSerializedTypedFilter(filter)))
+ Some(HBaseScanPartition(idx - 1, x, rs, ps, SerializedFilter.toSerializedTypedFilter(filter)))
} else {
None
}
@@ -86,6 +94,57 @@ class HBaseTableScanRDD(relation: HBaseRelation,
}.toSeq
}
+ private def buildGets(
+ tbr: TableResource,
+ g: Seq[Array[Byte]],
+ filter: Option[SparkSQLPushDownFilter],
+ columns: Seq[SchemaQualifierDefinition]): Iterator[Result] = {
+ g.grouped(relation.bulkGetSize).flatMap{ x =>
+ val gets = new ArrayList[Get]()
+ x.foreach{ y =>
+ val g = new Get(y)
+ columns.foreach { d =>
+ if (d.columnFamilyBytes.length > 0) {
+ g.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+ }
+ }
+ filter.foreach(g.setFilter(_))
+ gets.add(g)
+ }
+ val tmp = tbr.get(gets)
+ rddResources.addResource(tmp)
+ toResultIterator(tmp)
+ }
+ }
+
+ private def toResultIterator(result: GetResource): Iterator[Result] = {
+ val iterator = new Iterator[Result] {
+ var idx = 0
+ var cur: Option[Result] = None
+ override def hasNext: Boolean = {
+ while(idx < result.length && cur.isEmpty) {
+ val r = result(idx)
+ idx += 1
+ if (!r.isEmpty) {
+ cur = Some(r)
+ }
+ }
+ if (cur.isEmpty) {
+ rddResources.release(result)
+ }
+ cur.isDefined
+ }
+ override def next(): Result = {
+ hasNext
+ val ret = cur.get
+ cur = None
+ ret
+ }
+ }
+ iterator
+ }
+
+
private def buildScan(range: Range,
filter: Option[SparkSQLPushDownFilter],
columns: Seq[SchemaQualifierDefinition]): Scan = {
@@ -130,6 +189,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
}
iterator
}
+
lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
private def close() {
@@ -138,18 +198,29 @@ class HBaseTableScanRDD(relation: HBaseRelation,
override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
val partition = split.asInstanceOf[HBaseScanPartition]
-
+ val filter = SerializedFilter.fromSerializedFilter(partition.sf)
val scans = partition.scanRanges
- .map(buildScan(_, SerializedFilter.fromSerializedFilter(partition.sf), columns))
+ .map(buildScan(_, filter, columns))
val tableResource = TableResource(relation)
context.addTaskCompletionListener(context => close())
- val sIts = scans.par
- .map(tableResource.getScanner(_))
- .map(toResultIterator(_))
+ val points = partition.points
+ val gIt: Iterator[Result] = {
+ if (points.isEmpty) {
+ Iterator.empty: Iterator[Result]
+ } else {
+ buildGets(tableResource, points, filter, columns)
+ }
+ }
+ val rIts = scans.par
+ .map { scan =>
+ val scanner = tableResource.getScanner(scan)
+ rddResources.addResource(scanner)
+ scanner
+ }.map(toResultIterator(_))
.fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
x ++ y
- }
- sIts
+ } ++ gIt
+ rIts
}
}
@@ -176,6 +247,7 @@ private[hbase] case class HBaseScanPartition(
override val index: Int,
val regions: HBaseRegion,
val scanRanges: Seq[Range],
+ val points: Seq[Array[Byte]],
val sf: SerializedFilter) extends Partition
case class RDDResources(set: mutable.HashSet[Resource]) {