You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/15 18:43:34 UTC
[20/26] hbase git commit: HBASE-14795 Enhance the spark-hbase scan
operations (Zhan Zhang)
HBASE-14795 Enhance the spark-hbase scan operations (Zhan Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/676ce01c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/676ce01c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/676ce01c
Branch: refs/heads/hbase-12439
Commit: 676ce01c82c137348e88d0acaa694ad214dc2f12
Parents: f34d3e1
Author: tedyu <yu...@gmail.com>
Authored: Sun Dec 13 18:26:54 2015 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sun Dec 13 18:26:54 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/spark/DefaultSource.scala | 100 +++-------
.../hadoop/hbase/spark/HBaseContext.scala | 2 +-
.../hadoop/hbase/spark/datasources/Bound.scala | 91 +++++++++
.../spark/datasources/HBaseResources.scala | 147 ++++++++++++++
.../spark/datasources/HBaseTableScanRDD.scala | 199 +++++++++++++++++++
.../datasources/SerializableConfiguration.scala | 46 +++++
.../hbase/spark/datasources/package.scala | 36 ++++
7 files changed, 545 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 23480bb..664cf57 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.spark
import java.util
import java.util.concurrent.ConcurrentLinkedQueue
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.spark.datasources.{HBaseTableScanRDD, HBaseRegion, SerializableConfiguration}
import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange,
-PositionedByteRange, Bytes}
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
+import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.DataType
@@ -159,7 +159,7 @@ class DefaultSource extends RelationProvider with Logging {
* connection information
* @param sqlContext SparkSQL context
*/
-class HBaseRelation (val tableName:String,
+case class HBaseRelation (val tableName:String,
val schemaMappingDefinition:
java.util.HashMap[String, SchemaQualifierDefinition],
val batchingNum:Int,
@@ -179,6 +179,9 @@ class HBaseRelation (val tableName:String,
new HBaseContext(sqlContext.sparkContext, config)
}
+ val wrappedConf = new SerializableConfiguration(hbaseContext.config)
+ def hbaseConf = wrappedConf.value
+
/**
* Generates a Spark SQL schema object so Spark SQL knows what is being
* provided by this BaseRelation
@@ -222,6 +225,7 @@ class HBaseRelation (val tableName:String,
*/
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+
val pushDownTuple = buildPushDownPredicatesResource(filters)
val pushDownRowKeyFilter = pushDownTuple._1
var pushDownDynamicLogicExpression = pushDownTuple._2
@@ -253,7 +257,6 @@ class HBaseRelation (val tableName:String,
//retain the information for unit testing checks
DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
pushDownDynamicLogicExpression)
- var resultRDD: RDD[Row] = null
val getList = new util.ArrayList[Get]()
val rddList = new util.ArrayList[RDD[Row]]()
@@ -268,77 +271,24 @@ class HBaseRelation (val tableName:String,
getList.add(get)
})
- val rangeIt = pushDownRowKeyFilter.ranges.iterator
-
- while (rangeIt.hasNext) {
- val r = rangeIt.next()
-
- val scan = new Scan()
- scan.setBatch(batchingNum)
- scan.setCaching(cachingNum)
- requiredQualifierDefinitionList.foreach( d =>
- if (d.columnFamilyBytes.length > 0)
- scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-
- if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
- val pushDownFilterJava =
- new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
- valueArray, requiredQualifierDefinitionList)
-
- scan.setFilter(pushDownFilterJava)
- }
-
- //Check if there is a lower bound
- if (r.lowerBound != null && r.lowerBound.length > 0) {
-
- if (r.isLowerBoundEqualTo) {
- //HBase startRow is inclusive: Therefore it acts like isLowerBoundEqualTo
- // by default
- scan.setStartRow(r.lowerBound)
- } else {
- //Since we don't equalTo we want the next value we need
- // to add another byte to the start key. That new byte will be
- // the min byte value.
- val newArray = new Array[Byte](r.lowerBound.length + 1)
- System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length)
-
- //new Min Byte
- newArray(r.lowerBound.length) = Byte.MinValue
- scan.setStartRow(newArray)
- }
+ val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
+ Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
+ valueArray, requiredQualifierDefinitionList))
+ } else {
+ None
+ }
+ val hRdd = new HBaseTableScanRDD(this, pushDownFilterJava, requiredQualifierDefinitionList.seq)
+ pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
+ var resultRDD: RDD[Row] = {
+ val tmp = hRdd.map{ r =>
+ Row.fromSeq(requiredColumns.map(c =>
+ DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r)))
}
-
- //Check if there is a upperBound
- if (r.upperBound != null && r.upperBound.length > 0) {
- if (r.isUpperBoundEqualTo) {
- //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo
- // by default. So we need to add a new max byte to the stopRow key
- val newArray = new Array[Byte](r.upperBound.length + 1)
- System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
- //New Max Bytes
- newArray(r.upperBound.length) = Byte.MaxValue
- scan.setStopRow(newArray)
- } else {
- //Here equalTo is false for Upper bound which is exclusive and
- // HBase stopRow acts like that by default so no need to mutate the
- // rowKey
- scan.setStopRow(r.upperBound)
- }
+ if (tmp.partitions.size > 0) {
+ tmp
+ } else {
+ null
}
-
- val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
- Row.fromSeq(requiredColumns.map(c =>
- DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2)))
- })
- rddList.add(rdd)
- }
-
- //If there is more then one RDD then we have to union them together
- for (i <- 0 until rddList.size()) {
- if (resultRDD == null) resultRDD = rddList.get(i)
- else resultRDD = resultRDD.union(rddList.get(i))
-
}
//If there are gets then we can get them from the driver and union that rdd in
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index f8c24f2..2d21e69 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -57,7 +57,7 @@ import scala.collection.mutable
* to the working and managing the life cycle of HConnections.
*/
class HBaseContext(@transient sc: SparkContext,
- @transient config: Configuration,
+ @transient val config: Configuration,
val tmpHdfsConfgFile: String = null)
extends Serializable with Logging {
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
new file mode 100644
index 0000000..c869f31
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
+import org.apache.spark.Partition
+import org.apache.hadoop.hbase.spark.hbase._
+
+/**
+ * The Bound represent the boudary for the scan
+ *
+ * @param b The byte array of the bound
+ * @param inc inclusive or not.
+ */
+case class Bound(b: Array[Byte], inc: Boolean)
+// The non-overlapping ranges we need to scan, if lower is equal to upper, it is a get request
+case class Range(lower: Option[Bound], upper: Option[Bound])
+object Range {
+ def apply(region: HBaseRegion): Range = {
+ Range(region.start.map(Bound(_, true)), if (region.end.get.size == 0) {
+ None
+ } else {
+ region.end.map((Bound(_, false)))
+ })
+ }
+}
+
+object Ranges {
+ // We assume that
+ // 1. r.lower.inc is true, and r.upper.inc is false
+ // 2. for each range in rs, its upper.inc is false
+ def and(r: Range, rs: Seq[Range]): Seq[Range] = {
+ rs.flatMap{ s =>
+ val lower = s.lower.map { x =>
+ // the scan has lower bound
+ r.lower.map { y =>
+ // the region has lower bound
+ if (ord.compare(x.b, y.b) < 0) {
+ // scan lower bound is smaller than region server lower bound
+ Some(y)
+ } else {
+ // scan low bound is greater or equal to region server lower bound
+ Some(x)
+ }
+ }.getOrElse(Some(x))
+ }.getOrElse(r.lower)
+
+ val upper = s.upper.map { x =>
+ // the scan has upper bound
+ r.upper.map { y =>
+ // the region has upper bound
+ if (ord.compare(x.b, y.b) >= 0) {
+ // scan upper bound is larger than server upper bound
+ // but region server scan stop is exclusive. It is OK here.
+ Some(y)
+ } else {
+ // scan upper bound is less or equal to region server upper bound
+ Some(x)
+ }
+ }.getOrElse(Some(x))
+ }.getOrElse(r.upper)
+
+ val c = lower.map { case x =>
+ upper.map { case y =>
+ ord.compare(x.b, y.b)
+ }.getOrElse(-1)
+ }.getOrElse(-1)
+ if (c < 0) {
+ Some(Range(lower, upper))
+ } else {
+ None
+ }
+ }.seq
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
new file mode 100644
index 0000000..4417520
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.spark.HBaseRelation
+
+// Resource and ReferencedResources are defined for extensibility,
+// e.g., consolidate scan and bulkGet in the future work.
+
+// User has to invoke release explicitly to release the resource,
+// and potentially parent resources
+trait Resource {
+ def release(): Unit
+}
+
+case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource {
+ def release() {
+ rs.close()
+ tbr.release()
+ }
+}
+
+trait ReferencedResource {
+ var count: Int = 0
+ def init(): Unit
+ def destroy(): Unit
+ def acquire() = synchronized {
+ try {
+ count += 1
+ if (count == 1) {
+ init()
+ }
+ } catch {
+ case e: Throwable =>
+ release()
+ throw e
+ }
+ }
+
+ def release() = synchronized {
+ count -= 1
+ if (count == 0) {
+ destroy()
+ }
+ }
+
+ def releaseOnException[T](func: => T): T = {
+ acquire()
+ val ret = {
+ try {
+ func
+ } catch {
+ case e: Throwable =>
+ release()
+ throw e
+ }
+ }
+ ret
+ }
+}
+
+case class TableResource(relation: HBaseRelation) extends ReferencedResource {
+ var connection: Connection = _
+ var table: Table = _
+
+ override def init(): Unit = {
+ connection = ConnectionFactory.createConnection(relation.hbaseConf)
+ table = connection.getTable(TableName.valueOf(relation.tableName))
+ }
+
+ override def destroy(): Unit = {
+ if (table != null) {
+ table.close()
+ table = null
+ }
+ if (connection != null) {
+ connection.close()
+ connection = null
+ }
+ }
+
+ def getScanner(scan: Scan): ScanResource = releaseOnException {
+ ScanResource(this, table.getScanner(scan))
+ }
+}
+
+case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
+ var connection: Connection = _
+ var rl: RegionLocator = _
+ val regions = releaseOnException {
+ val keys = rl.getStartEndKeys
+ keys.getFirst.zip(keys.getSecond)
+ .zipWithIndex
+ .map(x =>
+ HBaseRegion(x._2,
+ Some(x._1._1),
+ Some(x._1._2),
+ Some(rl.getRegionLocation(x._1._1).getHostname)))
+ }
+
+ override def init(): Unit = {
+ connection = ConnectionFactory.createConnection(relation.hbaseConf)
+ rl = connection.getRegionLocator(TableName.valueOf(relation.tableName))
+ }
+
+ override def destroy(): Unit = {
+ if (rl != null) {
+ rl.close()
+ rl = null
+ }
+ if (connection != null) {
+ connection.close()
+ connection = null
+ }
+ }
+}
+
+object HBaseResources{
+ implicit def ScanResToScan(sr: ScanResource): ResultScanner = {
+ sr.rs
+ }
+
+ implicit def TableResToTable(tr: TableResource): Table = {
+ tr.table
+ }
+
+ implicit def RegionResToRegions(rr: RegionResource): Seq[HBaseRegion] = {
+ rr.regions
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
new file mode 100644
index 0000000..958a986
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.filter.Filter
+import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter}
+import org.apache.hadoop.hbase.spark.hbase._
+import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.{TaskContext, Logging, Partition}
+import org.apache.spark.rdd.RDD
+
+import scala.collection.mutable
+
+
+class HBaseTableScanRDD(relation: HBaseRelation,
+ @transient val filter: Option[SparkSQLPushDownFilter] = None,
+ val columns: Seq[SchemaQualifierDefinition] = Seq.empty
+ )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging {
+ var ranges = Seq.empty[Range]
+ def addRange(r: ScanRange) = {
+ val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
+ Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
+ } else {
+ None
+ }
+ val upper = if (r.upperBound != null && r.upperBound.length > 0) {
+ if (!r.isUpperBoundEqualTo) {
+ Some(Bound(r.upperBound, false))
+ } else {
+
+ // HBase stopRow is exclusive: therefore it DOESN'T act like isUpperBoundEqualTo
+ // by default. So we need to add a new max byte to the stopRow key
+ val newArray = new Array[Byte](r.upperBound.length + 1)
+ System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
+
+ //New Max Bytes
+ newArray(r.upperBound.length) = ByteMin
+ Some(Bound(newArray, false))
+ }
+ } else {
+ None
+ }
+ ranges :+= Range(lower, upper)
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val regions = RegionResource(relation)
+ var idx = 0
+ logDebug(s"There are ${regions.size} regions")
+ val ps = regions.flatMap { x =>
+ val rs = Ranges.and(Range(x), ranges)
+ if (rs.size > 0) {
+ if(log.isDebugEnabled) {
+ rs.foreach(x => logDebug(x.toString))
+ }
+ idx += 1
+ Some(HBaseScanPartition(idx - 1, x, rs, SerializedFilter.toSerializedTypedFilter(filter)))
+ } else {
+ None
+ }
+ }.toArray
+ regions.release()
+ ps.asInstanceOf[Array[Partition]]
+ }
+
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[HBaseScanPartition].regions.server.map {
+ identity
+ }.toSeq
+ }
+
+ private def buildScan(range: Range,
+ filter: Option[SparkSQLPushDownFilter],
+ columns: Seq[SchemaQualifierDefinition]): Scan = {
+ val scan = (range.lower, range.upper) match {
+ case (Some(Bound(a, b)), Some(Bound(c, d))) => new Scan(a, c)
+ case (None, Some(Bound(c, d))) => new Scan(Array[Byte](), c)
+ case (Some(Bound(a, b)), None) => new Scan(a)
+ case (None, None) => new Scan()
+ }
+
+ columns.foreach { d =>
+ if (d.columnFamilyBytes.length > 0) {
+ scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
+ }
+ }
+ scan.setBatch(relation.batchingNum)
+ scan.setCaching(relation.cachingNum)
+ filter.foreach(scan.setFilter(_))
+ scan
+ }
+ private def toResultIterator(scanner: ScanResource): Iterator[Result] = {
+ val iterator = new Iterator[Result] {
+ var cur: Option[Result] = None
+ override def hasNext: Boolean = {
+ if (cur.isEmpty) {
+ val r = scanner.next()
+ if (r == null) {
+ rddResources.release(scanner)
+ } else {
+ cur = Some(r)
+ }
+ }
+ cur.isDefined
+ }
+ override def next(): Result = {
+ hasNext
+ val ret = cur.get
+ cur = None
+ ret
+ }
+ }
+ iterator
+ }
+ lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
+
+ private def close() {
+ rddResources.release()
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
+ val partition = split.asInstanceOf[HBaseScanPartition]
+
+ val scans = partition.scanRanges
+ .map(buildScan(_, SerializedFilter.fromSerializedFilter(partition.sf), columns))
+ val tableResource = TableResource(relation)
+ context.addTaskCompletionListener(context => close())
+ val sIts = scans.par
+ .map(tableResource.getScanner(_))
+ .map(toResultIterator(_))
+ .fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
+ x ++ y
+ }
+ sIts
+ }
+}
+
+case class SerializedFilter(b: Option[Array[Byte]])
+
+object SerializedFilter {
+ def toSerializedTypedFilter(f: Option[SparkSQLPushDownFilter]): SerializedFilter = {
+ SerializedFilter(f.map(_.toByteArray))
+ }
+
+ def fromSerializedFilter(sf: SerializedFilter): Option[SparkSQLPushDownFilter] = {
+ sf.b.map(SparkSQLPushDownFilter.parseFrom(_))
+ }
+}
+
+private[hbase] case class HBaseRegion(
+ override val index: Int,
+ val start: Option[HBaseType] = None,
+ val end: Option[HBaseType] = None,
+ val server: Option[String] = None) extends Partition
+
+
+private[hbase] case class HBaseScanPartition(
+ override val index: Int,
+ val regions: HBaseRegion,
+ val scanRanges: Seq[Range],
+ val sf: SerializedFilter) extends Partition
+
+case class RDDResources(set: mutable.HashSet[Resource]) {
+ def addResource(s: Resource) {
+ set += s
+ }
+ def release() {
+ set.foreach(release(_))
+ }
+ def release(rs: Resource) {
+ try {
+ rs.release()
+ } finally {
+ set.remove(rs)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
new file mode 100644
index 0000000..2452a52
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.util.Utils
+
+import scala.util.control.NonFatal
+
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+ private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+ value = new Configuration(false)
+ value.readFields(in)
+ }
+
+ def tryOrIOException(block: => Unit) {
+ try {
+ block
+ } catch {
+ case e: IOException => throw e
+ case NonFatal(t) => throw new IOException(t)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/676ce01c/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
new file mode 100644
index 0000000..4ff0413
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.math.Ordering
+
+package object hbase {
+ type HBaseType = Array[Byte]
+ val ByteMax = -1.asInstanceOf[Byte]
+ val ByteMin = 0.asInstanceOf[Byte]
+ val ord: Ordering[HBaseType] = new Ordering[HBaseType] {
+ def compare(x: Array[Byte], y: Array[Byte]): Int = {
+ return Bytes.compareTo(x, y)
+ }
+ }
+ //Do not use BinaryType.ordering
+ implicit val order: Ordering[HBaseType] = ord
+
+}