You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/12/29 00:48:16 UTC

hbase git commit: HBASE-14796 Enhance the Gets in the connector (Zhan Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master 2fba25b66 -> 6868c6366


HBASE-14796 Enhance the Gets in the connector (Zhan Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6868c636
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6868c636
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6868c636

Branch: refs/heads/master
Commit: 6868c6366002d5b4e25980f37ede8839e7a7e92d
Parents: 2fba25b
Author: tedyu <yu...@gmail.com>
Authored: Mon Dec 28 15:48:10 2015 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Mon Dec 28 15:48:10 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      | 33 ++-----
 .../hadoop/hbase/spark/datasources/Bound.scala  | 24 +++++
 .../spark/datasources/HBaseResources.scala      | 14 +++
 .../spark/datasources/HBaseSparkConf.scala      |  2 +
 .../spark/datasources/HBaseTableScanRDD.scala   | 92 +++++++++++++++++---
 5 files changed, 127 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 73cab3c..b6d7982 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -159,6 +159,10 @@ case class HBaseRelation (val tableName:String,
     .getOrElse(sqlContext.sparkContext.getConf.getInt(
     HBaseSparkConf.BATCH_NUM,  HBaseSparkConf.defaultBatchNum))
 
+  val bulkGetSize =  parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
+    .getOrElse(sqlContext.sparkContext.getConf.getInt(
+    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.defaultBulkGetSize))
+
   //create or get latest HBaseContext
   @transient val hbaseContext:HBaseContext = if (useHBaseContext) {
     LatestHBaseContextCache.latest
@@ -267,6 +271,7 @@ case class HBaseRelation (val tableName:String,
       None
     }
     val hRdd = new HBaseTableScanRDD(this, pushDownFilterJava, requiredQualifierDefinitionList.seq)
+    pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
     pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
     var resultRDD: RDD[Row] = {
       val tmp = hRdd.map{ r =>
@@ -280,34 +285,6 @@ case class HBaseRelation (val tableName:String,
       }
     }
 
-    //If there are gets then we can get them from the driver and union that rdd in
-    // with the rest of the values.
-    if (getList.size() > 0) {
-      val connection =
-        ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
-      try {
-        val table = connection.getTable(TableName.valueOf(tableName))
-        try {
-          val results = table.get(getList)
-          val rowList = mutable.MutableList[Row]()
-          for (i <- 0 until results.length) {
-            val rowArray = requiredColumns.map(c =>
-              DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i)))
-            rowList += Row.fromSeq(rowArray)
-          }
-          val getRDD = sqlContext.sparkContext.parallelize(rowList)
-          if (resultRDD == null) resultRDD = getRDD
-          else {
-            resultRDD = resultRDD.union(getRDD)
-          }
-        } finally {
-          table.close()
-        }
-      } finally {
-        connection.close()
-      }
-    }
-
     if (resultRDD == null) {
       val scan = new Scan()
       scan.setCacheBlocks(blockCacheEnable)

http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
index 0f6098d..8e03e95 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
@@ -87,3 +87,27 @@ object Ranges {
   }
 }
 
+object Points {
+  def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = {
+    ps.flatMap { p =>
+      if (ord.compare(r.lower.get.b, p) <= 0) {
+        // if region lower bound is less or equal to the point
+        if (r.upper.isDefined) {
+          // if region upper bound is defined
+          if (ord.compare(r.upper.get.b, p) > 0) {
+            // if the upper bound is greater than the point (because upper bound is exclusive)
+            Some(p)
+          } else {
+            None
+          }
+        } else {
+          // if the region upper bound is not defined (infinity)
+          Some(p)
+        }
+      } else {
+        None
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
index 19a6ea7..14c5fd0 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
@@ -38,6 +38,12 @@ case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource
   }
 }
 
+case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource {
+  def release() {
+    tbr.release()
+  }
+}
+
 trait ReferencedResource {
   var count: Int = 0
   def init(): Unit
@@ -100,6 +106,10 @@ case class TableResource(relation: HBaseRelation) extends ReferencedResource {
   def getScanner(scan: Scan): ScanResource = releaseOnException {
     ScanResource(this, table.getScanner(scan))
   }
+
+  def get(list: java.util.List[org.apache.hadoop.hbase.client.Get]) = releaseOnException {
+    GetResource(this, table.get(list))
+  }
 }
 
 case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
@@ -138,6 +148,10 @@ object HBaseResources{
     sr.rs
   }
 
+  implicit def GetResToResult(gr: GetResource): Array[Result] = {
+    gr.rs
+  }
+
   implicit def TableResToTable(tr: TableResource): Table = {
     tr.table
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 67580b0..5e11356 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -29,4 +29,6 @@ object HBaseSparkConf{
   val defaultCachingSize = 1000
   val BATCH_NUM = "spark.hbase.batchNum"
   val defaultBatchNum = 1000
+  val BULKGET_SIZE = "spark.hbase.bulkGetSize"
+  val defaultBulkGetSize = 1000
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
index eb9d39a..f288c34 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hbase.spark.datasources
 
+import java.util.ArrayList
+
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter}
 import org.apache.hadoop.hbase.spark.hbase._
@@ -32,7 +34,12 @@ class HBaseTableScanRDD(relation: HBaseRelation,
      val columns: Seq[SchemaQualifierDefinition] = Seq.empty
      )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging  {
   private def sparkConf = SparkEnv.get.conf
-  var ranges = Seq.empty[Range]
+  @transient var ranges = Seq.empty[Range]
+  @transient var points = Seq.empty[Array[Byte]]
+  def addPoint(p: Array[Byte]) {
+    points :+= p
+  }
+
   def addRange(r: ScanRange) = {
     val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
       Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
@@ -65,12 +72,13 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     logDebug(s"There are ${regions.size} regions")
     val ps = regions.flatMap { x =>
       val rs = Ranges.and(Range(x), ranges)
-      if (rs.size > 0) {
+      val ps = Points.and(Range(x), points)
+      if (rs.size > 0 || ps.size > 0) {
         if(log.isDebugEnabled) {
           rs.foreach(x => logDebug(x.toString))
         }
         idx += 1
-        Some(HBaseScanPartition(idx - 1, x, rs, SerializedFilter.toSerializedTypedFilter(filter)))
+        Some(HBaseScanPartition(idx - 1, x, rs, ps, SerializedFilter.toSerializedTypedFilter(filter)))
       } else {
         None
       }
@@ -86,6 +94,57 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     }.toSeq
   }
 
+  private def buildGets(
+      tbr: TableResource,
+      g: Seq[Array[Byte]],
+      filter: Option[SparkSQLPushDownFilter],
+      columns: Seq[SchemaQualifierDefinition]): Iterator[Result] = {
+    g.grouped(relation.bulkGetSize).flatMap{ x =>
+      val gets = new ArrayList[Get]()
+      x.foreach{ y =>
+        val g = new Get(y)
+        columns.foreach { d =>
+          if (d.columnFamilyBytes.length > 0) {
+            g.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+          }
+        }
+        filter.foreach(g.setFilter(_))
+        gets.add(g)
+      }
+      val tmp = tbr.get(gets)
+      rddResources.addResource(tmp)
+      toResultIterator(tmp)
+    }
+  }
+
+  private def toResultIterator(result: GetResource): Iterator[Result] = {
+    val iterator = new Iterator[Result] {
+      var idx = 0
+      var cur: Option[Result] = None
+      override def hasNext: Boolean = {
+        while(idx < result.length && cur.isEmpty) {
+          val r = result(idx)
+          idx += 1
+          if (!r.isEmpty) {
+            cur = Some(r)
+          }
+        }
+        if (cur.isEmpty) {
+          rddResources.release(result)
+        }
+        cur.isDefined
+      }
+      override def next(): Result = {
+        hasNext
+        val ret = cur.get
+        cur = None
+        ret
+      }
+    }
+    iterator
+  }
+
+
   private def buildScan(range: Range,
       filter: Option[SparkSQLPushDownFilter],
       columns: Seq[SchemaQualifierDefinition]): Scan = {
@@ -130,6 +189,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
     }
     iterator
   }
+
   lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
 
   private def close() {
@@ -138,18 +198,29 @@ class HBaseTableScanRDD(relation: HBaseRelation,
 
   override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
     val partition = split.asInstanceOf[HBaseScanPartition]
-
+    val filter = SerializedFilter.fromSerializedFilter(partition.sf)
     val scans = partition.scanRanges
-      .map(buildScan(_, SerializedFilter.fromSerializedFilter(partition.sf), columns))
+      .map(buildScan(_, filter, columns))
     val tableResource = TableResource(relation)
     context.addTaskCompletionListener(context => close())
-    val sIts = scans.par
-      .map(tableResource.getScanner(_))
-      .map(toResultIterator(_))
+    val points = partition.points
+    val gIt: Iterator[Result] =  {
+      if (points.isEmpty) {
+        Iterator.empty: Iterator[Result]
+      } else {
+        buildGets(tableResource, points, filter, columns)
+      }
+    }
+    val rIts = scans.par
+      .map { scan =>
+      val scanner = tableResource.getScanner(scan)
+      rddResources.addResource(scanner)
+      scanner
+    }.map(toResultIterator(_))
       .fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
       x ++ y
-    }
-    sIts
+    } ++ gIt
+    rIts
   }
 }
 
@@ -176,6 +247,7 @@ private[hbase] case class HBaseScanPartition(
     override val index: Int,
     val regions: HBaseRegion,
     val scanRanges: Seq[Range],
+    val points: Seq[Array[Byte]],
     val sf: SerializedFilter) extends Partition
 
 case class RDDResources(set: mutable.HashSet[Resource]) {