You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:52 UTC
[6/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
deleted file mode 100644
index 2469c8e..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.hadoop.hbase.{HConstants, TableName}
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.rdd.RDD
-
-import scala.reflect.ClassTag
-
-/**
- * HBaseRDDFunctions contains a set of implicit functions that can be
- * applied to a Spark RDD so that we can easily interact with HBase
- */
-@InterfaceAudience.Public
-object HBaseRDDFunctions
-{
-
- /**
- * These are implicit methods for a RDD that contains any type of
- * data.
- *
- * @param rdd This is for rdd of any type
- * @tparam T This is any type
- */
- implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * put. This will not return a new RDD. Think of it like a foreach
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param f The function that will turn the RDD values
- * into HBase Put objects.
- */
- def hbaseBulkPut(hc: HBaseContext,
- tableName: TableName,
- f: (T) => Put): Unit = {
- hc.bulkPut(rdd, tableName, f)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * get. This will return a new RDD. Think about it as a RDD map
- * function. In that every RDD value will get a new value out of
- * HBase. That new value will populate the newly generated RDD.
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param batchSize How many gets to execute in a single batch
- * @param f The function that will turn the RDD values
- * in HBase Get objects
- * @param convertResult The function that will convert a HBase
- * Result object into a value that will go
- * into the resulting RDD
- * @tparam R The type of Object that will be coming
- * out of the resulting RDD
- * @return A resulting RDD with type R objects
- */
- def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
- tableName: TableName, batchSize:Int,
- f: (T) => Get, convertResult: (Result) => R): RDD[R] = {
- hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * get. This will return a new RDD. Think about it as a RDD map
- * function. In that every RDD value will get a new value out of
- * HBase. That new value will populate the newly generated RDD.
- *
- * @param hc The hbaseContext object to identify which
- * HBase cluster connection to use
- * @param tableName The tableName that the put will be sent to
- * @param batchSize How many gets to execute in a single batch
- * @param f The function that will turn the RDD values
- * in HBase Get objects
- * @return A resulting RDD with type R objects
- */
- def hbaseBulkGet(hc: HBaseContext,
- tableName: TableName, batchSize:Int,
- f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = {
- hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
- batchSize, rdd, f,
- result => if (result != null && result.getRow != null) {
- (new ImmutableBytesWritable(result.getRow), result)
- } else {
- null
- })
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's bulk
- * Delete. This will not return a new RDD.
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param tableName The tableName that the deletes will be sent to
- * @param f The function that will convert the RDD value into
- * a HBase Delete Object
- * @param batchSize The number of Deletes to be sent in a single batch
- */
- def hbaseBulkDelete(hc: HBaseContext,
- tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = {
- hc.bulkDelete(rdd, tableName, f, batchSize)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's
- * foreachPartition method. This will ack very much like a normal RDD
- * foreach method but for the fact that you will now have a HBase connection
- * while iterating through the values.
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param f This function will get an iterator for a Partition of an
- * RDD along with a connection object to HBase
- */
- def hbaseForeachPartition(hc: HBaseContext,
- f: (Iterator[T], Connection) => Unit): Unit = {
- hc.foreachPartition(rdd, f)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's
- * mapPartitions method. This will ask very much like a normal RDD
- * map partitions method but for the fact that you will now have a
- * HBase connection while iterating through the values
- *
- * @param hc The hbaseContext object to identify which HBase
- * cluster connection to use
- * @param f This function will get an iterator for a Partition of an
- * RDD along with a connection object to HBase
- * @tparam R This is the type of objects that will go into the resulting
- * RDD
- * @return A resulting RDD of type R
- */
- def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
- f: (Iterator[T], Connection) => Iterator[R]):
- RDD[R] = {
- hc.mapPartitions[T,R](rdd, f)
- }
-
- /**
- * Spark Implementation of HBase Bulk load for wide rows or when
- * values are not already combined at the time of the map process
- *
- * A Spark Implementation of HBase Bulk load
- *
- * This will take the content from an existing RDD then sort and shuffle
- * it with respect to region splits. The result of that sort and shuffle
- * will be written to HFiles.
- *
- * After this function is executed the user will have to call
- * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
- *
- * Also note this version of bulk load is different from past versions in
- * that it includes the qualifier as part of the sort process. The
- * reason for this is to be able to support rows will very large number
- * of columns.
- *
- * @param tableName The HBase table we are loading into
- * @param flatMap A flapMap function that will make every row in the RDD
- * into N cells for the bulk load
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- */
- def hbaseBulkLoad(hc: HBaseContext,
- tableName: TableName,
- flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
- stagingDir:String,
- familyHFileWriteOptionsMap:
- util.Map[Array[Byte], FamilyHFileWriteOptions] =
- new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
- compactionExclude: Boolean = false,
- maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
- hc.bulkLoad(rdd, tableName,
- flatMap, stagingDir, familyHFileWriteOptionsMap,
- compactionExclude, maxSize)
- }
-
- /**
- * Implicit method that gives easy access to HBaseContext's
- * bulkLoadThinRows method.
- *
- * Spark Implementation of HBase Bulk load for short rows some where less then
- * a 1000 columns. This bulk load should be faster for tables will thinner
- * rows then the other spark implementation of bulk load that puts only one
- * value into a record going into a shuffle
- *
- * This will take the content from an existing RDD then sort and shuffle
- * it with respect to region splits. The result of that sort and shuffle
- * will be written to HFiles.
- *
- * After this function is executed the user will have to call
- * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
- *
- * In this implementation only the rowKey is given to the shuffle as the key
- * and all the columns are already linked to the RowKey before the shuffle
- * stage. The sorting of the qualifier is done in memory out side of the
- * shuffle stage
- *
- * @param tableName The HBase table we are loading into
- * @param mapFunction A function that will convert the RDD records to
- * the key value format used for the shuffle to prep
- * for writing to the bulk loaded HFiles
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- */
- def hbaseBulkLoadThinRows(hc: HBaseContext,
- tableName: TableName,
- mapFunction: (T) =>
- (ByteArrayWrapper, FamiliesQualifiersValues),
- stagingDir:String,
- familyHFileWriteOptionsMap:
- util.Map[Array[Byte], FamilyHFileWriteOptions] =
- new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
- compactionExclude: Boolean = false,
- maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
- hc.bulkLoadThinRows(rdd, tableName,
- mapFunction, stagingDir, familyHFileWriteOptionsMap,
- compactionExclude, maxSize)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
deleted file mode 100644
index fe4b65f..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.util.Pair
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction}
-import org.apache.spark.streaming.api.java.JavaDStream
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * This is the Java Wrapper over HBaseContext which is written in
- * Scala. This class will be used by developers that want to
- * work with Spark or Spark Streaming in Java
- *
- * @param jsc This is the JavaSparkContext that we will wrap
- * @param config This is the config information to out HBase cluster
- */
-@InterfaceAudience.Public
-class JavaHBaseContext(@transient jsc: JavaSparkContext,
- @transient config: Configuration) extends Serializable {
- val hbaseContext = new HBaseContext(jsc.sc, config)
-
- /**
- * A simple enrichment of the traditional Spark javaRdd foreachPartition.
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * @param javaRdd Original javaRdd with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the RDD values and a Connection object to interact
- * with HBase
- */
- def foreachPartition[T](javaRdd: JavaRDD[T],
- f: VoidFunction[(java.util.Iterator[T], Connection)]) = {
-
- hbaseContext.foreachPartition(javaRdd.rdd,
- (it: Iterator[T], conn: Connection) => {
- f.call((it, conn))
- })
- }
-
- /**
- * A simple enrichment of the traditional Spark Streaming dStream foreach
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * @param javaDstream Original DStream with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the JavaDStream values and a Connection object to
- * interact with HBase
- */
- def foreachPartition[T](javaDstream: JavaDStream[T],
- f: VoidFunction[(Iterator[T], Connection)]) = {
- hbaseContext.foreachPartition(javaDstream.dstream,
- (it: Iterator[T], conn: Connection) => f.call(it, conn))
- }
-
- /**
- * A simple enrichment of the traditional Spark JavaRDD mapPartition.
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * Note: Make sure to partition correctly to avoid memory issue when
- * getting data from HBase
- *
- * @param javaRdd Original JavaRdd with data to iterate over
- * @param f Function to be given a iterator to iterate through
- * the RDD values and a Connection object to interact
- * with HBase
- * @return Returns a new RDD generated by the user definition
- * function just like normal mapPartition
- */
- def mapPartitions[T, R](javaRdd: JavaRDD[T],
- f: FlatMapFunction[(java.util.Iterator[T],
- Connection), R]): JavaRDD[R] = {
-
- def fn = (it: Iterator[T], conn: Connection) =>
- asScalaIterator(
- f.call((asJavaIterator(it), conn)).iterator()
- )
-
- JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
- (iterator: Iterator[T], connection: Connection) =>
- fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
- }
-
- /**
- * A simple enrichment of the traditional Spark Streaming JavaDStream
- * mapPartition.
- *
- * This function differs from the original in that it offers the
- * developer access to a already connected Connection object
- *
- * Note: Do not close the Connection object. All Connection
- * management is handled outside this method
- *
- * Note: Make sure to partition correctly to avoid memory issue when
- * getting data from HBase
- *
- * @param javaDstream Original JavaDStream with data to iterate over
- * @param mp Function to be given a iterator to iterate through
- * the JavaDStream values and a Connection object to
- * interact with HBase
- * @return Returns a new JavaDStream generated by the user
- * definition function just like normal mapPartition
- */
- def streamMap[T, U](javaDstream: JavaDStream[T],
- mp: Function[(Iterator[T], Connection), Iterator[U]]):
- JavaDStream[U] = {
- JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
- (it: Iterator[T], conn: Connection) =>
- mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
- }
-
- /**
- * A simple abstraction over the HBaseContext.foreachPartition method.
- *
- * It allow addition support for a user to take JavaRDD
- * and generate puts and send them to HBase.
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param javaRdd Original JavaRDD with data to iterate over
- * @param tableName The name of the table to put into
- * @param f Function to convert a value in the JavaRDD
- * to a HBase Put
- */
- def bulkPut[T](javaRdd: JavaRDD[T],
- tableName: TableName,
- f: Function[(T), Put]) {
-
- hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamMapPartition method.
- *
- * It allow addition support for a user to take a JavaDStream and
- * generate puts and send them to HBase.
- *
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param javaDstream Original DStream with data to iterate over
- * @param tableName The name of the table to put into
- * @param f Function to convert a value in
- * the JavaDStream to a HBase Put
- */
- def streamBulkPut[T](javaDstream: JavaDStream[T],
- tableName: TableName,
- f: Function[T, Put]) = {
- hbaseContext.streamBulkPut(javaDstream.dstream,
- tableName,
- (t: T) => f.call(t))
- }
-
- /**
- * A simple abstraction over the HBaseContext.foreachPartition method.
- *
- * It allow addition support for a user to take a JavaRDD and
- * generate delete and send them to HBase.
- *
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param javaRdd Original JavaRDD with data to iterate over
- * @param tableName The name of the table to delete from
- * @param f Function to convert a value in the JavaRDD to a
- * HBase Deletes
- * @param batchSize The number of deletes to batch before sending to HBase
- */
- def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
- f: Function[T, Delete], batchSize: Integer) {
- hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize)
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamBulkMutation method.
- *
- * It allow addition support for a user to take a JavaDStream and
- * generate Delete and send them to HBase.
- *
- * The complexity of managing the Connection is
- * removed from the developer
- *
- * @param javaDStream Original DStream with data to iterate over
- * @param tableName The name of the table to delete from
- * @param f Function to convert a value in the JavaDStream to a
- * HBase Delete
- * @param batchSize The number of deletes to be sent at once
- */
- def streamBulkDelete[T](javaDStream: JavaDStream[T],
- tableName: TableName,
- f: Function[T, Delete],
- batchSize: Integer) = {
- hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
- (t: T) => f.call(t),
- batchSize)
- }
-
- /**
- * A simple abstraction over the HBaseContext.mapPartition method.
- *
- * It allow addition support for a user to take a JavaRDD and generates a
- * new RDD based on Gets and the results they bring back from HBase
- *
- * @param tableName The name of the table to get from
- * @param batchSize batch size of how many gets to retrieve in a single fetch
- * @param javaRdd Original JavaRDD with data to iterate over
- * @param makeGet Function to convert a value in the JavaRDD to a
- * HBase Get
- * @param convertResult This will convert the HBase Result object to
- * what ever the user wants to put in the resulting
- * JavaRDD
- * @return New JavaRDD that is created by the Get to HBase
- */
- def bulkGet[T, U](tableName: TableName,
- batchSize: Integer,
- javaRdd: JavaRDD[T],
- makeGet: Function[T, Get],
- convertResult: Function[Result, U]): JavaRDD[U] = {
-
- JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
- batchSize,
- javaRdd.rdd,
- (t: T) => makeGet.call(t),
- (r: Result) => {
- convertResult.call(r)
- })(fakeClassTag[U]))(fakeClassTag[U])
-
- }
-
- /**
- * A simple abstraction over the HBaseContext.streamMap method.
- *
- * It allow addition support for a user to take a DStream and
- * generates a new DStream based on Gets and the results
- * they bring back from HBase
- *
- * @param tableName The name of the table to get from
- * @param batchSize The number of gets to be batched together
- * @param javaDStream Original DStream with data to iterate over
- * @param makeGet Function to convert a value in the JavaDStream to a
- * HBase Get
- * @param convertResult This will convert the HBase Result object to
- * what ever the user wants to put in the resulting
- * JavaDStream
- * @return New JavaDStream that is created by the Get to HBase
- */
- def streamBulkGet[T, U](tableName: TableName,
- batchSize: Integer,
- javaDStream: JavaDStream[T],
- makeGet: Function[T, Get],
- convertResult: Function[Result, U]): JavaDStream[U] = {
- JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
- batchSize,
- javaDStream.dstream,
- (t: T) => makeGet.call(t),
- (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
- }
-
- /**
- * A simple abstraction over the HBaseContext.bulkLoad method.
- * It allow addition support for a user to take a JavaRDD and
- * convert into new JavaRDD[Pair] based on MapFunction,
- * and HFiles will be generated in stagingDir for bulk load
- *
- * @param javaRdd The javaRDD we are bulk loading from
- * @param tableName The HBase table we are loading into
- * @param mapFunc A Function that will convert a value in JavaRDD
- * to Pair(KeyFamilyQualifier, Array[Byte])
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- */
- def bulkLoad[T](javaRdd: JavaRDD[T],
- tableName: TableName,
- mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
- stagingDir: String,
- familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
- compactionExclude: Boolean,
- maxSize: Long):
- Unit = {
- hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
- val keyFamilyQualifier = t.getFirst
- val value = t.getSecond
- Seq((keyFamilyQualifier, value)).iterator
- }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
- }
-
- /**
- * A simple abstraction over the HBaseContext.bulkLoadThinRows method.
- * It allow addition support for a user to take a JavaRDD and
- * convert into new JavaRDD[Pair] based on MapFunction,
- * and HFiles will be generated in stagingDir for bulk load
- *
- * @param javaRdd The javaRDD we are bulk loading from
- * @param tableName The HBase table we are loading into
- * @param mapFunc A Function that will convert a value in JavaRDD
- * to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
- * @param stagingDir The location on the FileSystem to bulk load into
- * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
- * column family is written
- * @param compactionExclude Compaction excluded for the HFiles
- * @param maxSize Max size for the HFiles before they roll
- */
- def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
- tableName: TableName,
- mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
- stagingDir: String,
- familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
- compactionExclude: Boolean,
- maxSize: Long):
- Unit = {
- hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
- tableName, t => {
- (t.getFirst, t.getSecond)
- }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
- }
-
- /**
- * This function will use the native HBase TableInputFormat with the
- * given scan object to generate a new JavaRDD
- *
- * @param tableName The name of the table to scan
- * @param scans The HBase scan object to use to read data from HBase
- * @param f Function to convert a Result object from HBase into
- * What the user wants in the final generated JavaRDD
- * @return New JavaRDD with results from scan
- */
- def hbaseRDD[U](tableName: TableName,
- scans: Scan,
- f: Function[(ImmutableBytesWritable, Result), U]):
- JavaRDD[U] = {
- JavaRDD.fromRDD(
- hbaseContext.hbaseRDD[U](tableName,
- scans,
- (v: (ImmutableBytesWritable, Result)) =>
- f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
- }
-
- /**
- * A overloaded version of HBaseContext hbaseRDD that define the
- * type of the resulting JavaRDD
- *
- * @param tableName The name of the table to scan
- * @param scans The HBase scan object to use to read data from HBase
- * @return New JavaRDD with results from scan
- */
- def hbaseRDD(tableName: TableName,
- scans: Scan):
- JavaRDD[(ImmutableBytesWritable, Result)] = {
- JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
- }
-
- /**
- * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
- *
- * This method is used to keep ClassTags out of the external Java API, as the Java compiler
- * cannot produce them automatically. While this ClassTag-faking does please the compiler,
- * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
- *
- * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
- * just worse performance or security issues.
- * For instance, an Array[AnyRef] can hold any type T,
- * but may lose primitive
- * specialization.
- */
- private[spark]
- def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
deleted file mode 100644
index 7fd5a62..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * This is the key to be used for sorting and shuffling.
- *
- * We will only partition on the rowKey but we will sort on all three
- *
- * @param rowKey Record RowKey
- * @param family Record ColumnFamily
- * @param qualifier Cell Qualifier
- */
-@InterfaceAudience.Public
-class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte])
- extends Comparable[KeyFamilyQualifier] with Serializable {
- override def compareTo(o: KeyFamilyQualifier): Int = {
- var result = Bytes.compareTo(rowKey, o.rowKey)
- if (result == 0) {
- result = Bytes.compareTo(family, o.family)
- if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
- }
- result
- }
- override def toString: String = {
- Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
deleted file mode 100644
index 6d0a2d2..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.mapreduce.InputFormat
-import org.apache.spark.rdd.NewHadoopRDD
-import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
-
-@InterfaceAudience.Public
-class NewHBaseRDD[K,V](@transient sc : SparkContext,
- @transient inputFormatClass: Class[_ <: InputFormat[K, V]],
- @transient keyClass: Class[K],
- @transient valueClass: Class[V],
- @transient conf: Configuration,
- val hBaseContext: HBaseContext) extends NewHadoopRDD(sc,inputFormatClass, keyClass, valueClass, conf) {
-
- override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
- hBaseContext.applyCreds()
- super.compute(theSplit, context)
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
deleted file mode 100644
index 4602ac8..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.hbase._
-
-/**
- * The Bound represent the boudary for the scan
- *
- * @param b The byte array of the bound
- * @param inc inclusive or not.
- */
-@InterfaceAudience.Private
-case class Bound(b: Array[Byte], inc: Boolean)
-// The non-overlapping ranges we need to scan, if lower is equal to upper, it is a get request
-
-@InterfaceAudience.Private
-case class Range(lower: Option[Bound], upper: Option[Bound])
-
-@InterfaceAudience.Private
-object Range {
- def apply(region: HBaseRegion): Range = {
- Range(region.start.map(Bound(_, true)), if (region.end.get.size == 0) {
- None
- } else {
- region.end.map((Bound(_, false)))
- })
- }
-}
-
-@InterfaceAudience.Private
-object Ranges {
- // We assume that
- // 1. r.lower.inc is true, and r.upper.inc is false
- // 2. for each range in rs, its upper.inc is false
- def and(r: Range, rs: Seq[Range]): Seq[Range] = {
- rs.flatMap{ s =>
- val lower = s.lower.map { x =>
- // the scan has lower bound
- r.lower.map { y =>
- // the region has lower bound
- if (ord.compare(x.b, y.b) < 0) {
- // scan lower bound is smaller than region server lower bound
- Some(y)
- } else {
- // scan low bound is greater or equal to region server lower bound
- Some(x)
- }
- }.getOrElse(Some(x))
- }.getOrElse(r.lower)
-
- val upper = s.upper.map { x =>
- // the scan has upper bound
- r.upper.map { y =>
- // the region has upper bound
- if (ord.compare(x.b, y.b) >= 0) {
- // scan upper bound is larger than server upper bound
- // but region server scan stop is exclusive. It is OK here.
- Some(y)
- } else {
- // scan upper bound is less or equal to region server upper bound
- Some(x)
- }
- }.getOrElse(Some(x))
- }.getOrElse(r.upper)
-
- val c = lower.map { case x =>
- upper.map { case y =>
- ord.compare(x.b, y.b)
- }.getOrElse(-1)
- }.getOrElse(-1)
- if (c < 0) {
- Some(Range(lower, upper))
- } else {
- None
- }
- }.seq
- }
-}
-
-@InterfaceAudience.Private
-object Points {
- def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = {
- ps.flatMap { p =>
- if (ord.compare(r.lower.get.b, p) <= 0) {
- // if region lower bound is less or equal to the point
- if (r.upper.isDefined) {
- // if region upper bound is defined
- if (ord.compare(r.upper.get.b, p) > 0) {
- // if the upper bound is greater than the point (because upper bound is exclusive)
- Some(p)
- } else {
- None
- }
- } else {
- // if the region upper bound is not defined (infinity)
- Some(p)
- }
- } else {
- None
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
deleted file mode 100644
index 0f467a7..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.spark.{HBaseConnectionKey, SmartConnection,
- HBaseConnectionCache, HBaseRelation}
-import scala.language.implicitConversions
-
-// Resource and ReferencedResources are defined for extensibility,
-// e.g., consolidate scan and bulkGet in the future work.
-
-// User has to invoke release explicitly to release the resource,
-// and potentially parent resources
-@InterfaceAudience.Private
-trait Resource {
- def release(): Unit
-}
-
-@InterfaceAudience.Private
-case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource {
- def release() {
- rs.close()
- tbr.release()
- }
-}
-
-@InterfaceAudience.Private
-case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource {
- def release() {
- tbr.release()
- }
-}
-
-@InterfaceAudience.Private
-trait ReferencedResource {
- var count: Int = 0
- def init(): Unit
- def destroy(): Unit
- def acquire() = synchronized {
- try {
- count += 1
- if (count == 1) {
- init()
- }
- } catch {
- case e: Throwable =>
- release()
- throw e
- }
- }
-
- def release() = synchronized {
- count -= 1
- if (count == 0) {
- destroy()
- }
- }
-
- def releaseOnException[T](func: => T): T = {
- acquire()
- val ret = {
- try {
- func
- } catch {
- case e: Throwable =>
- release()
- throw e
- }
- }
- ret
- }
-}
-
-@InterfaceAudience.Private
-case class TableResource(relation: HBaseRelation) extends ReferencedResource {
- var connection: SmartConnection = _
- var table: Table = _
-
- override def init(): Unit = {
- connection = HBaseConnectionCache.getConnection(relation.hbaseConf)
- table = connection.getTable(TableName.valueOf(relation.tableName))
- }
-
- override def destroy(): Unit = {
- if (table != null) {
- table.close()
- table = null
- }
- if (connection != null) {
- connection.close()
- connection = null
- }
- }
-
- def getScanner(scan: Scan): ScanResource = releaseOnException {
- ScanResource(this, table.getScanner(scan))
- }
-
- def get(list: java.util.List[org.apache.hadoop.hbase.client.Get]) = releaseOnException {
- GetResource(this, table.get(list))
- }
-}
-
-@InterfaceAudience.Private
-case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
- var connection: SmartConnection = _
- var rl: RegionLocator = _
- val regions = releaseOnException {
- val keys = rl.getStartEndKeys
- keys.getFirst.zip(keys.getSecond)
- .zipWithIndex
- .map(x =>
- HBaseRegion(x._2,
- Some(x._1._1),
- Some(x._1._2),
- Some(rl.getRegionLocation(x._1._1).getHostname)))
- }
-
- override def init(): Unit = {
- connection = HBaseConnectionCache.getConnection(relation.hbaseConf)
- rl = connection.getRegionLocator(TableName.valueOf(relation.tableName))
- }
-
- override def destroy(): Unit = {
- if (rl != null) {
- rl.close()
- rl = null
- }
- if (connection != null) {
- connection.close()
- connection = null
- }
- }
-}
-
-@InterfaceAudience.Private
-object HBaseResources{
- implicit def ScanResToScan(sr: ScanResource): ResultScanner = {
- sr.rs
- }
-
- implicit def GetResToResult(gr: GetResource): Array[Result] = {
- gr.rs
- }
-
- implicit def TableResToTable(tr: TableResource): Table = {
- tr.table
- }
-
- implicit def RegionResToRegions(rr: RegionResource): Seq[HBaseRegion] = {
- rr.regions
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
deleted file mode 100644
index dc497f9..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the hbase configuration. User can either set them in SparkConf, which
- * will take effect globally, or configure it per table, which will overwrite the value
- * set in SparkConf. If not set, the default value will take effect.
- */
-@InterfaceAudience.Public
-object HBaseSparkConf{
- /** Set to false to disable server-side caching of blocks for this scan,
- * false by default, since full table scans generate too much BC churn.
- */
- val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
- val DEFAULT_QUERY_CACHEBLOCKS = false
- /** The number of rows for caching that will be passed to scan. */
- val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
- /** Set the maximum number of values to return for each call to next() in scan. */
- val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
- /** The number of BulkGets send to HBase. */
- val BULKGET_SIZE = "hbase.spark.bulkget.size"
- val DEFAULT_BULKGET_SIZE = 1000
- /** Set to specify the location of hbase configuration file. */
- val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
- /** Set to specify whether create or use latest cached HBaseContext*/
- val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
- val DEFAULT_USE_HBASECONTEXT = true
- /** Pushdown the filter to data source engine to increase the performance of queries. */
- val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
- val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
- /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
- val QUERY_ENCODER = "hbase.spark.query.encoder"
- val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
- /** The timestamp used to filter columns with a specific timestamp. */
- val TIMESTAMP = "hbase.spark.query.timestamp"
- /** The starting timestamp used to filter columns with a specific range of versions. */
- val TIMERANGE_START = "hbase.spark.query.timerange.start"
- /** The ending timestamp used to filter columns with a specific range of versions. */
- val TIMERANGE_END = "hbase.spark.query.timerange.end"
- /** The maximum number of version to return. */
- val MAX_VERSIONS = "hbase.spark.query.maxVersions"
- /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
- val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
deleted file mode 100644
index 1ca1b45..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import java.util.ArrayList
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.spark._
-import org.apache.hadoop.hbase.spark.hbase._
-import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
-import org.apache.hadoop.hbase.util.ShutdownHookManager
-import org.apache.spark.sql.datasources.hbase.Field
-import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition}
-import org.apache.spark.rdd.RDD
-
-import scala.collection.mutable
-
-@InterfaceAudience.Private
-class HBaseTableScanRDD(relation: HBaseRelation,
- val hbaseContext: HBaseContext,
- @transient val filter: Option[SparkSQLPushDownFilter] = None,
- val columns: Seq[Field] = Seq.empty
- )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging {
- private def sparkConf = SparkEnv.get.conf
- @transient var ranges = Seq.empty[Range]
- @transient var points = Seq.empty[Array[Byte]]
- def addPoint(p: Array[Byte]) {
- points :+= p
- }
-
- def addRange(r: ScanRange) = {
- val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
- Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
- } else {
- None
- }
- val upper = if (r.upperBound != null && r.upperBound.length > 0) {
- if (!r.isUpperBoundEqualTo) {
- Some(Bound(r.upperBound, false))
- } else {
-
- // HBase stopRow is exclusive: therefore it DOESN'T act like isUpperBoundEqualTo
- // by default. So we need to add a new max byte to the stopRow key
- val newArray = new Array[Byte](r.upperBound.length + 1)
- System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
- //New Max Bytes
- newArray(r.upperBound.length) = ByteMin
- Some(Bound(newArray, false))
- }
- } else {
- None
- }
- ranges :+= Range(lower, upper)
- }
-
- override def getPartitions: Array[Partition] = {
- val regions = RegionResource(relation)
- var idx = 0
- logDebug(s"There are ${regions.size} regions")
- val ps = regions.flatMap { x =>
- val rs = Ranges.and(Range(x), ranges)
- val ps = Points.and(Range(x), points)
- if (rs.size > 0 || ps.size > 0) {
- if(log.isDebugEnabled) {
- rs.foreach(x => logDebug(x.toString))
- }
- idx += 1
- Some(HBaseScanPartition(idx - 1, x, rs, ps, SerializedFilter.toSerializedTypedFilter(filter)))
- } else {
- None
- }
- }.toArray
- regions.release()
- ShutdownHookManager.affixShutdownHook( new Thread() {
- override def run() {
- HBaseConnectionCache.close()
- }
- }, 0)
- ps.asInstanceOf[Array[Partition]]
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- split.asInstanceOf[HBaseScanPartition].regions.server.map {
- identity
- }.toSeq
- }
-
- private def buildGets(
- tbr: TableResource,
- g: Seq[Array[Byte]],
- filter: Option[SparkSQLPushDownFilter],
- columns: Seq[Field],
- hbaseContext: HBaseContext): Iterator[Result] = {
- g.grouped(relation.bulkGetSize).flatMap{ x =>
- val gets = new ArrayList[Get](x.size)
- x.foreach{ y =>
- val g = new Get(y)
- handleTimeSemantics(g)
- columns.foreach { d =>
- if (!d.isRowKey) {
- g.addColumn(d.cfBytes, d.colBytes)
- }
- }
- filter.foreach(g.setFilter(_))
- gets.add(g)
- }
- hbaseContext.applyCreds()
- val tmp = tbr.get(gets)
- rddResources.addResource(tmp)
- toResultIterator(tmp)
- }
- }
-
- private def toResultIterator(result: GetResource): Iterator[Result] = {
- val iterator = new Iterator[Result] {
- var idx = 0
- var cur: Option[Result] = None
- override def hasNext: Boolean = {
- while(idx < result.length && cur.isEmpty) {
- val r = result(idx)
- idx += 1
- if (!r.isEmpty) {
- cur = Some(r)
- }
- }
- if (cur.isEmpty) {
- rddResources.release(result)
- }
- cur.isDefined
- }
- override def next(): Result = {
- hasNext
- val ret = cur.get
- cur = None
- ret
- }
- }
- iterator
- }
-
- private def buildScan(range: Range,
- filter: Option[SparkSQLPushDownFilter],
- columns: Seq[Field]): Scan = {
- val scan = (range.lower, range.upper) match {
- case (Some(Bound(a, b)), Some(Bound(c, d))) => new Scan(a, c)
- case (None, Some(Bound(c, d))) => new Scan(Array[Byte](), c)
- case (Some(Bound(a, b)), None) => new Scan(a)
- case (None, None) => new Scan()
- }
- handleTimeSemantics(scan)
-
- columns.foreach { d =>
- if (!d.isRowKey) {
- scan.addColumn(d.cfBytes, d.colBytes)
- }
- }
- scan.setCacheBlocks(relation.blockCacheEnable)
- scan.setBatch(relation.batchNum)
- scan.setCaching(relation.cacheSize)
- filter.foreach(scan.setFilter(_))
- scan
- }
- private def toResultIterator(scanner: ScanResource): Iterator[Result] = {
- val iterator = new Iterator[Result] {
- var cur: Option[Result] = None
- override def hasNext: Boolean = {
- if (cur.isEmpty) {
- val r = scanner.next()
- if (r == null) {
- rddResources.release(scanner)
- } else {
- cur = Some(r)
- }
- }
- cur.isDefined
- }
- override def next(): Result = {
- hasNext
- val ret = cur.get
- cur = None
- ret
- }
- }
- iterator
- }
-
- lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
-
- private def close() {
- rddResources.release()
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
- val partition = split.asInstanceOf[HBaseScanPartition]
- val filter = SerializedFilter.fromSerializedFilter(partition.sf)
- val scans = partition.scanRanges
- .map(buildScan(_, filter, columns))
- val tableResource = TableResource(relation)
- context.addTaskCompletionListener(context => close())
- val points = partition.points
- val gIt: Iterator[Result] = {
- if (points.isEmpty) {
- Iterator.empty: Iterator[Result]
- } else {
- buildGets(tableResource, points, filter, columns, hbaseContext)
- }
- }
- val rIts = scans.par
- .map { scan =>
- hbaseContext.applyCreds()
- val scanner = tableResource.getScanner(scan)
- rddResources.addResource(scanner)
- scanner
- }.map(toResultIterator(_))
- .fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
- x ++ y
- } ++ gIt
- ShutdownHookManager.affixShutdownHook( new Thread() {
- override def run() {
- HBaseConnectionCache.close()
- }
- }, 0)
- rIts
- }
-
- private def handleTimeSemantics(query: Query): Unit = {
- // Set timestamp related values if present
- (query, relation.timestamp, relation.minTimestamp, relation.maxTimestamp) match {
- case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
- case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
-
- case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
- case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
-
- case (q, None, None, None) =>
-
- case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " +
- s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimestamp.get}, " +
- s"maxTimeStamp is: ${relation.maxTimestamp.get}")
- }
- if (relation.maxVersions.isDefined) {
- query match {
- case q: Scan => q.setMaxVersions(relation.maxVersions.get)
- case q: Get => q.setMaxVersions(relation.maxVersions.get)
- case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions")
- }
- }
- }
-}
-
-case class SerializedFilter(b: Option[Array[Byte]])
-
-object SerializedFilter {
- def toSerializedTypedFilter(f: Option[SparkSQLPushDownFilter]): SerializedFilter = {
- SerializedFilter(f.map(_.toByteArray))
- }
-
- def fromSerializedFilter(sf: SerializedFilter): Option[SparkSQLPushDownFilter] = {
- sf.b.map(SparkSQLPushDownFilter.parseFrom(_))
- }
-}
-
-private[hbase] case class HBaseRegion(
- override val index: Int,
- val start: Option[HBaseType] = None,
- val end: Option[HBaseType] = None,
- val server: Option[String] = None) extends Partition
-
-
-private[hbase] case class HBaseScanPartition(
- override val index: Int,
- val regions: HBaseRegion,
- val scanRanges: Seq[Range],
- val points: Seq[Array[Byte]],
- val sf: SerializedFilter) extends Partition
-
-case class RDDResources(set: mutable.HashSet[Resource]) {
- def addResource(s: Resource) {
- set += s
- }
- def release() {
- set.foreach(release(_))
- }
- def release(rs: Resource) {
- try {
- rs.release()
- } finally {
- set.remove(rs)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
deleted file mode 100644
index 6a50189..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Logging
-import org.apache.spark.sql.types._
-
-/**
- * The ranges for the data type whose size is known. Whether the bound is inclusive
- * or exclusive is undefind, and upper to the caller to decide.
- *
- * @param low: the lower bound of the range.
- * @param upper: the upper bound of the range.
- */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-case class BoundRange(low: Array[Byte],upper: Array[Byte])
-
-/**
- * The class identifies the ranges for a java primitive type. The caller needs
- * to decide the bound is either inclusive or exclusive on its own.
- * information
- *
- * @param less: the set of ranges for LessThan/LessOrEqualThan
- * @param greater: the set of ranges for GreaterThan/GreaterThanOrEqualTo
- * @param value: the byte array of the original value
- */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-case class BoundRanges(less: Array[BoundRange], greater: Array[BoundRange], value: Array[Byte])
-
-/**
- * The trait to support plugin architecture for different encoder/decoder.
- * encode is used for serializing the data type to byte array and the filter is
- * used to filter out the unnecessary records.
- */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-trait BytesEncoder {
- def encode(dt: DataType, value: Any): Array[Byte]
-
- /**
- * The function performing real filtering operations. The format of filterBytes depends on the
- * implementation of the BytesEncoder.
- *
- * @param input: the current input byte array that needs to be filtered out
- * @param offset1: the starting offset of the input byte array.
- * @param length1: the length of the input byte array.
- * @param filterBytes: the byte array provided by query condition.
- * @param offset2: the starting offset in the filterBytes.
- * @param length2: the length of the bytes in the filterBytes
- * @param ops: The operation of the filter operator.
- * @return true: the record satisfies the predicates
- * false: the record does not satisfy the predicates.
- */
- def filter(input: Array[Byte], offset1: Int, length1: Int,
- filterBytes: Array[Byte], offset2: Int, length2: Int,
- ops: JavaBytesEncoder): Boolean
-
- /**
- * Currently, it is used for partition pruning.
- * As for some codec, the order may be inconsistent between java primitive
- * type and its byte array. We may have to split the predicates on some
- * of the java primitive type into multiple predicates.
- *
- * For example in naive codec, some of the java primitive types have to be
- * split into multiple predicates, and union these predicates together to
- * make the predicates be performed correctly.
- * For example, if we have "COLUMN < 2", we will transform it into
- * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
- */
- def ranges(in: Any): Option[BoundRanges]
-}
-
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-object JavaBytesEncoder extends Enumeration with Logging{
- type JavaBytesEncoder = Value
- val Greater, GreaterEqual, Less, LessEqual, Equal, Unknown = Value
-
- /**
- * create the encoder/decoder
- *
- * @param clsName: the class name of the encoder/decoder class
- * @return the instance of the encoder plugin.
- */
- def create(clsName: String): BytesEncoder = {
- try {
- Class.forName(clsName).newInstance.asInstanceOf[BytesEncoder]
- } catch {
- case _: Throwable =>
- logWarning(s"$clsName cannot be initiated, falling back to naive encoder")
- new NaiveEncoder()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
deleted file mode 100644
index 6138242..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-package org.apache.hadoop.hbase.spark.datasources
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.spark.hbase._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Logging
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-/**
- * This is the naive non-order preserving encoder/decoder.
- * Due to the inconsistency of the order between java primitive types
- * and their bytearray. The data type has to be passed in so that the filter
- * can work correctly, which is done by wrapping the type into the first byte
- * of the serialized array.
- */
-@InterfaceAudience.Private
-class NaiveEncoder extends BytesEncoder with Logging{
- var code = 0
- def nextCode: Byte = {
- code += 1
- (code - 1).asInstanceOf[Byte]
- }
- val BooleanEnc = nextCode
- val ShortEnc = nextCode
- val IntEnc = nextCode
- val LongEnc = nextCode
- val FloatEnc = nextCode
- val DoubleEnc = nextCode
- val StringEnc = nextCode
- val BinaryEnc = nextCode
- val TimestampEnc = nextCode
- val UnknownEnc = nextCode
-
-
- /**
- * Evaluate the java primitive type and return the BoundRanges. For one value, it may have
- * multiple output ranges because of the inconsistency of order between java primitive type
- * and its byte array order.
- *
- * For short, integer, and long, the order of number is consistent with byte array order
- * if two number has the same sign bit. But the negative number is larger than positive
- * number in byte array.
- *
- * For double and float, the order of positive number is consistent with its byte array order.
- * But the order of negative number is the reverse order of byte array. Please refer to IEEE-754
- * and https://en.wikipedia.org/wiki/Single-precision_floating-point_format
- */
- def ranges(in: Any): Option[BoundRanges] = in match {
- case a: Integer =>
- val b = Bytes.toBytes(a)
- if (a >= 0) {
- logDebug(s"range is 0 to $a and ${Integer.MIN_VALUE} to -1")
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(0: Int), b),
- BoundRange(Bytes.toBytes(Integer.MIN_VALUE), Bytes.toBytes(-1: Int))),
- Array(BoundRange(b, Bytes.toBytes(Integer.MAX_VALUE))), b))
- } else {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(Integer.MIN_VALUE), b)),
- Array(BoundRange(b, Bytes.toBytes(-1: Integer)),
- BoundRange(Bytes.toBytes(0: Int), Bytes.toBytes(Integer.MAX_VALUE))), b))
- }
- case a: Long =>
- val b = Bytes.toBytes(a)
- if (a >= 0) {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(0: Long), b),
- BoundRange(Bytes.toBytes(Long.MinValue), Bytes.toBytes(-1: Long))),
- Array(BoundRange(b, Bytes.toBytes(Long.MaxValue))), b))
- } else {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(Long.MinValue), b)),
- Array(BoundRange(b, Bytes.toBytes(-1: Long)),
- BoundRange(Bytes.toBytes(0: Long), Bytes.toBytes(Long.MaxValue))), b))
- }
- case a: Short =>
- val b = Bytes.toBytes(a)
- if (a >= 0) {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(0: Short), b),
- BoundRange(Bytes.toBytes(Short.MinValue), Bytes.toBytes(-1: Short))),
- Array(BoundRange(b, Bytes.toBytes(Short.MaxValue))), b))
- } else {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(Short.MinValue), b)),
- Array(BoundRange(b, Bytes.toBytes(-1: Short)),
- BoundRange(Bytes.toBytes(0: Short), Bytes.toBytes(Short.MaxValue))), b))
- }
- case a: Double =>
- val b = Bytes.toBytes(a)
- if (a >= 0.0f) {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(0.0d), b),
- BoundRange(Bytes.toBytes(-0.0d), Bytes.toBytes(Double.MinValue))),
- Array(BoundRange(b, Bytes.toBytes(Double.MaxValue))), b))
- } else {
- Some(BoundRanges(
- Array(BoundRange(b, Bytes.toBytes(Double.MinValue))),
- Array(BoundRange(Bytes.toBytes(-0.0d), b),
- BoundRange(Bytes.toBytes(0.0d), Bytes.toBytes(Double.MaxValue))), b))
- }
- case a: Float =>
- val b = Bytes.toBytes(a)
- if (a >= 0.0f) {
- Some(BoundRanges(
- Array(BoundRange(Bytes.toBytes(0.0f), b),
- BoundRange(Bytes.toBytes(-0.0f), Bytes.toBytes(Float.MinValue))),
- Array(BoundRange(b, Bytes.toBytes(Float.MaxValue))), b))
- } else {
- Some(BoundRanges(
- Array(BoundRange(b, Bytes.toBytes(Float.MinValue))),
- Array(BoundRange(Bytes.toBytes(-0.0f), b),
- BoundRange(Bytes.toBytes(0.0f), Bytes.toBytes(Float.MaxValue))), b))
- }
- case a: Array[Byte] =>
- Some(BoundRanges(
- Array(BoundRange(bytesMin, a)),
- Array(BoundRange(a, bytesMax)), a))
- case a: Byte =>
- val b = Array(a)
- Some(BoundRanges(
- Array(BoundRange(bytesMin, b)),
- Array(BoundRange(b, bytesMax)), b))
- case a: String =>
- val b = Bytes.toBytes(a)
- Some(BoundRanges(
- Array(BoundRange(bytesMin, b)),
- Array(BoundRange(b, bytesMax)), b))
- case a: UTF8String =>
- val b = a.getBytes
- Some(BoundRanges(
- Array(BoundRange(bytesMin, b)),
- Array(BoundRange(b, bytesMax)), b))
- case _ => None
- }
-
- def compare(c: Int, ops: JavaBytesEncoder): Boolean = {
- ops match {
- case JavaBytesEncoder.Greater => c > 0
- case JavaBytesEncoder.GreaterEqual => c >= 0
- case JavaBytesEncoder.Less => c < 0
- case JavaBytesEncoder.LessEqual => c <= 0
- }
- }
-
- /**
- * encode the data type into byte array. Note that it is a naive implementation with the
- * data type byte appending to the head of the serialized byte array.
- *
- * @param dt: The data type of the input
- * @param value: the value of the input
- * @return the byte array with the first byte indicating the data type.
- */
- override def encode(dt: DataType,
- value: Any): Array[Byte] = {
- dt match {
- case BooleanType =>
- val result = new Array[Byte](Bytes.SIZEOF_BOOLEAN + 1)
- result(0) = BooleanEnc
- value.asInstanceOf[Boolean] match {
- case true => result(1) = -1: Byte
- case false => result(1) = 0: Byte
- }
- result
- case ShortType =>
- val result = new Array[Byte](Bytes.SIZEOF_SHORT + 1)
- result(0) = ShortEnc
- Bytes.putShort(result, 1, value.asInstanceOf[Short])
- result
- case IntegerType =>
- val result = new Array[Byte](Bytes.SIZEOF_INT + 1)
- result(0) = IntEnc
- Bytes.putInt(result, 1, value.asInstanceOf[Int])
- result
- case LongType|TimestampType =>
- val result = new Array[Byte](Bytes.SIZEOF_LONG + 1)
- result(0) = LongEnc
- Bytes.putLong(result, 1, value.asInstanceOf[Long])
- result
- case FloatType =>
- val result = new Array[Byte](Bytes.SIZEOF_FLOAT + 1)
- result(0) = FloatEnc
- Bytes.putFloat(result, 1, value.asInstanceOf[Float])
- result
- case DoubleType =>
- val result = new Array[Byte](Bytes.SIZEOF_DOUBLE + 1)
- result(0) = DoubleEnc
- Bytes.putDouble(result, 1, value.asInstanceOf[Double])
- result
- case BinaryType =>
- val v = value.asInstanceOf[Array[Bytes]]
- val result = new Array[Byte](v.length + 1)
- result(0) = BinaryEnc
- System.arraycopy(v, 0, result, 1, v.length)
- result
- case StringType =>
- val bytes = Bytes.toBytes(value.asInstanceOf[String])
- val result = new Array[Byte](bytes.length + 1)
- result(0) = StringEnc
- System.arraycopy(bytes, 0, result, 1, bytes.length)
- result
- case _ =>
- val bytes = Bytes.toBytes(value.toString)
- val result = new Array[Byte](bytes.length + 1)
- result(0) = UnknownEnc
- System.arraycopy(bytes, 0, result, 1, bytes.length)
- result
- }
- }
-
- override def filter(input: Array[Byte], offset1: Int, length1: Int,
- filterBytes: Array[Byte], offset2: Int, length2: Int,
- ops: JavaBytesEncoder): Boolean = {
- filterBytes(offset2) match {
- case ShortEnc =>
- val in = Bytes.toShort(input, offset1)
- val value = Bytes.toShort(filterBytes, offset2 + 1)
- compare(in.compareTo(value), ops)
- case IntEnc =>
- val in = Bytes.toInt(input, offset1)
- val value = Bytes.toInt(filterBytes, offset2 + 1)
- compare(in.compareTo(value), ops)
- case LongEnc | TimestampEnc =>
- val in = Bytes.toInt(input, offset1)
- val value = Bytes.toInt(filterBytes, offset2 + 1)
- compare(in.compareTo(value), ops)
- case FloatEnc =>
- val in = Bytes.toFloat(input, offset1)
- val value = Bytes.toFloat(filterBytes, offset2 + 1)
- compare(in.compareTo(value), ops)
- case DoubleEnc =>
- val in = Bytes.toDouble(input, offset1)
- val value = Bytes.toDouble(filterBytes, offset2 + 1)
- compare(in.compareTo(value), ops)
- case _ =>
- // for String, Byte, Binary, Boolean and other types
- // we can use the order of byte array directly.
- compare(
- Bytes.compareTo(input, offset1, length1, filterBytes, offset2 + 1, length2 - 1), ops)
- }
- }
-}