You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:52 UTC

[6/9] hbase git commit: HBASE-18817 pull the hbase-spark module out of branch-2.

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
deleted file mode 100644
index 2469c8e..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-
-import org.apache.hadoop.hbase.{HConstants, TableName}
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.rdd.RDD
-
-import scala.reflect.ClassTag
-
-/**
- * HBaseRDDFunctions contains a set of implicit functions that can be
- * applied to a Spark RDD so that we can easily interact with HBase
- */
-@InterfaceAudience.Public
-object HBaseRDDFunctions
-{
-
-  /**
-   * These are implicit methods for a RDD that contains any type of
-   * data.
-   *
-   * @param rdd This is for rdd of any type
-   * @tparam T  This is any type
-   */
-  implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * put.  This will not return a new RDD.  Think of it like a foreach
-     *
-     * @param hc         The hbaseContext object to identify which
-     *                   HBase cluster connection to use
-     * @param tableName  The tableName that the put will be sent to
-     * @param f          The function that will turn the RDD values
-     *                   into HBase Put objects.
-     */
-    def hbaseBulkPut(hc: HBaseContext,
-                     tableName: TableName,
-                     f: (T) => Put): Unit = {
-      hc.bulkPut(rdd, tableName, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new RDD.  Think about it as a RDD map
-     * function.  In that every RDD value will get a new value out of
-     * HBase.  That new value will populate the newly generated RDD.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @param convertResult  The function that will convert a HBase
-     *                       Result object into a value that will go
-     *                       into the resulting RDD
-     * @tparam R             The type of Object that will be coming
-     *                       out of the resulting RDD
-     * @return               A resulting RDD with type R objects
-     */
-    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
-                            tableName: TableName, batchSize:Int,
-                            f: (T) => Get, convertResult: (Result) => R): RDD[R] = {
-      hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new RDD.  Think about it as a RDD map
-     * function.  In that every RDD value will get a new value out of
-     * HBase.  That new value will populate the newly generated RDD.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @return               A resulting RDD with type R objects
-     */
-    def hbaseBulkGet(hc: HBaseContext,
-                                  tableName: TableName, batchSize:Int,
-                                  f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = {
-      hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
-        batchSize, rdd, f,
-        result => if (result != null && result.getRow != null) {
-          (new ImmutableBytesWritable(result.getRow), result)
-        } else {
-          null
-        })
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * Delete.  This will not return a new RDD.
-     *
-     * @param hc         The hbaseContext object to identify which HBase
-     *                   cluster connection to use
-     * @param tableName  The tableName that the deletes will be sent to
-     * @param f          The function that will convert the RDD value into
-     *                   a HBase Delete Object
-     * @param batchSize  The number of Deletes to be sent in a single batch
-     */
-    def hbaseBulkDelete(hc: HBaseContext,
-                        tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = {
-      hc.bulkDelete(rdd, tableName, f, batchSize)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * foreachPartition method.  This will ack very much like a normal RDD
-     * foreach method but for the fact that you will now have a HBase connection
-     * while iterating through the values.
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            RDD along with a connection object to HBase
-     */
-    def hbaseForeachPartition(hc: HBaseContext,
-                              f: (Iterator[T], Connection) => Unit): Unit = {
-      hc.foreachPartition(rdd, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * mapPartitions method.  This will ask very much like a normal RDD
-     * map partitions method but for the fact that you will now have a
-     * HBase connection while iterating through the values
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            RDD along with a connection object to HBase
-     * @tparam R  This is the type of objects that will go into the resulting
-     *            RDD
-     * @return    A resulting RDD of type R
-     */
-    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
-                                        f: (Iterator[T], Connection) => Iterator[R]):
-    RDD[R] = {
-      hc.mapPartitions[T,R](rdd, f)
-    }
-
-    /**
-     * Spark Implementation of HBase Bulk load for wide rows or when
-     * values are not already combined at the time of the map process
-     *
-     * A Spark Implementation of HBase Bulk load
-     *
-     * This will take the content from an existing RDD then sort and shuffle
-     * it with respect to region splits.  The result of that sort and shuffle
-     * will be written to HFiles.
-     *
-     * After this function is executed the user will have to call
-     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-     *
-     * Also note this version of bulk load is different from past versions in
-     * that it includes the qualifier as part of the sort process. The
-     * reason for this is to be able to support rows will very large number
-     * of columns.
-     *
-     * @param tableName                      The HBase table we are loading into
-     * @param flatMap                        A flapMap function that will make every row in the RDD
-     *                                       into N cells for the bulk load
-     * @param stagingDir                     The location on the FileSystem to bulk load into
-     * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-     *                                       column family is written
-     * @param compactionExclude              Compaction excluded for the HFiles
-     * @param maxSize                        Max size for the HFiles before they roll
-     */
-    def hbaseBulkLoad(hc: HBaseContext,
-                         tableName: TableName,
-                         flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
-                         stagingDir:String,
-                         familyHFileWriteOptionsMap:
-                         util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                         new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
-                         compactionExclude: Boolean = false,
-                         maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
-      hc.bulkLoad(rdd, tableName,
-        flatMap, stagingDir, familyHFileWriteOptionsMap,
-        compactionExclude, maxSize)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * bulkLoadThinRows method.
-     *
-     * Spark Implementation of HBase Bulk load for short rows some where less then
-     * a 1000 columns.  This bulk load should be faster for tables will thinner
-     * rows then the other spark implementation of bulk load that puts only one
-     * value into a record going into a shuffle
-     *
-     * This will take the content from an existing RDD then sort and shuffle
-     * it with respect to region splits.  The result of that sort and shuffle
-     * will be written to HFiles.
-     *
-     * After this function is executed the user will have to call
-     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-     *
-     * In this implementation only the rowKey is given to the shuffle as the key
-     * and all the columns are already linked to the RowKey before the shuffle
-     * stage.  The sorting of the qualifier is done in memory out side of the
-     * shuffle stage
-     *
-     * @param tableName                      The HBase table we are loading into
-     * @param mapFunction                    A function that will convert the RDD records to
-     *                                       the key value format used for the shuffle to prep
-     *                                       for writing to the bulk loaded HFiles
-     * @param stagingDir                     The location on the FileSystem to bulk load into
-     * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-     *                                       column family is written
-     * @param compactionExclude              Compaction excluded for the HFiles
-     * @param maxSize                        Max size for the HFiles before they roll
-     */
-    def hbaseBulkLoadThinRows(hc: HBaseContext,
-                      tableName: TableName,
-                      mapFunction: (T) =>
-                        (ByteArrayWrapper, FamiliesQualifiersValues),
-                      stagingDir:String,
-                      familyHFileWriteOptionsMap:
-                      util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                      new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
-                      compactionExclude: Boolean = false,
-                      maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
-      hc.bulkLoadThinRows(rdd, tableName,
-        mapFunction, stagingDir, familyHFileWriteOptionsMap,
-        compactionExclude, maxSize)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
deleted file mode 100644
index fe4b65f..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util.Map
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.util.Pair
-import org.apache.yetus.audience.InterfaceAudience
-import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction}
-import org.apache.spark.streaming.api.java.JavaDStream
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * This is the Java Wrapper over HBaseContext which is written in
- * Scala.  This class will be used by developers that want to
- * work with Spark or Spark Streaming in Java
- *
- * @param jsc    This is the JavaSparkContext that we will wrap
- * @param config This is the config information to out HBase cluster
- */
-@InterfaceAudience.Public
-class JavaHBaseContext(@transient jsc: JavaSparkContext,
-                       @transient config: Configuration) extends Serializable {
-  val hbaseContext = new HBaseContext(jsc.sc, config)
-
-  /**
-   * A simple enrichment of the traditional Spark javaRdd foreachPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * @param javaRdd Original javaRdd with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                the RDD values and a Connection object to interact
-   *                with HBase
-   */
-  def foreachPartition[T](javaRdd: JavaRDD[T],
-                          f: VoidFunction[(java.util.Iterator[T], Connection)]) = {
-
-    hbaseContext.foreachPartition(javaRdd.rdd,
-      (it: Iterator[T], conn: Connection) => {
-        f.call((it, conn))
-      })
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming dStream foreach
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * @param javaDstream Original DStream with data to iterate over
-   * @param f           Function to be given a iterator to iterate through
-   *                    the JavaDStream values and a Connection object to
-   *                    interact with HBase
-   */
-  def foreachPartition[T](javaDstream: JavaDStream[T],
-                          f: VoidFunction[(Iterator[T], Connection)]) = {
-    hbaseContext.foreachPartition(javaDstream.dstream,
-      (it: Iterator[T], conn: Connection) => f.call(it, conn))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark JavaRDD mapPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   * getting data from HBase
-   *
-   * @param javaRdd Original JavaRdd with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                the RDD values and a Connection object to interact
-   *                with HBase
-   * @return        Returns a new RDD generated by the user definition
-   *                function just like normal mapPartition
-   */
-  def mapPartitions[T, R](javaRdd: JavaRDD[T],
-                          f: FlatMapFunction[(java.util.Iterator[T],
-                            Connection), R]): JavaRDD[R] = {
-
-    def fn = (it: Iterator[T], conn: Connection) =>
-      asScalaIterator(
-        f.call((asJavaIterator(it), conn)).iterator()
-      )
-
-    JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
-      (iterator: Iterator[T], connection: Connection) =>
-        fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming JavaDStream
-   * mapPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected Connection object
-   *
-   * Note: Do not close the Connection object.  All Connection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   * getting data from HBase
-   *
-   * @param javaDstream Original JavaDStream with data to iterate over
-   * @param mp          Function to be given a iterator to iterate through
-   *                    the JavaDStream values and a Connection object to
-   *                    interact with HBase
-   * @return            Returns a new JavaDStream generated by the user
-   *                    definition function just like normal mapPartition
-   */
-  def streamMap[T, U](javaDstream: JavaDStream[T],
-                      mp: Function[(Iterator[T], Connection), Iterator[U]]):
-  JavaDStream[U] = {
-    JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
-      (it: Iterator[T], conn: Connection) =>
-        mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take JavaRDD
-   * and generate puts and send them to HBase.
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param javaRdd   Original JavaRDD with data to iterate over
-   * @param tableName The name of the table to put into
-   * @param f         Function to convert a value in the JavaRDD
-   *                  to a HBase Put
-   */
-  def bulkPut[T](javaRdd: JavaRDD[T],
-                 tableName: TableName,
-                 f: Function[(T), Put]) {
-
-    hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMapPartition method.
-   *
-   * It allow addition support for a user to take a JavaDStream and
-   * generate puts and send them to HBase.
-   *
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param javaDstream Original DStream with data to iterate over
-   * @param tableName   The name of the table to put into
-   * @param f           Function to convert a value in
-   *                    the JavaDStream to a HBase Put
-   */
-  def streamBulkPut[T](javaDstream: JavaDStream[T],
-                       tableName: TableName,
-                       f: Function[T, Put]) = {
-    hbaseContext.streamBulkPut(javaDstream.dstream,
-      tableName,
-      (t: T) => f.call(t))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take a JavaRDD and
-   * generate delete and send them to HBase.
-   *
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param javaRdd   Original JavaRDD with data to iterate over
-   * @param tableName The name of the table to delete from
-   * @param f         Function to convert a value in the JavaRDD to a
-   *                  HBase Deletes
-   * @param batchSize The number of deletes to batch before sending to HBase
-   */
-  def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
-                    f: Function[T, Delete], batchSize: Integer) {
-    hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamBulkMutation method.
-   *
-   * It allow addition support for a user to take a JavaDStream and
-   * generate Delete and send them to HBase.
-   *
-   * The complexity of managing the Connection is
-   * removed from the developer
-   *
-   * @param javaDStream Original DStream with data to iterate over
-   * @param tableName   The name of the table to delete from
-   * @param f           Function to convert a value in the JavaDStream to a
-   *                    HBase Delete
-   * @param batchSize   The number of deletes to be sent at once
-   */
-  def streamBulkDelete[T](javaDStream: JavaDStream[T],
-                          tableName: TableName,
-                          f: Function[T, Delete],
-                          batchSize: Integer) = {
-    hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
-      (t: T) => f.call(t),
-      batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.mapPartition method.
-   *
-   * It allow addition support for a user to take a JavaRDD and generates a
-   * new RDD based on Gets and the results they bring back from HBase
-   *
-   * @param tableName     The name of the table to get from
-   * @param batchSize     batch size of how many gets to retrieve in a single fetch
-   * @param javaRdd       Original JavaRDD with data to iterate over
-   * @param makeGet       Function to convert a value in the JavaRDD to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      JavaRDD
-   * @return              New JavaRDD that is created by the Get to HBase
-   */
-  def bulkGet[T, U](tableName: TableName,
-                    batchSize: Integer,
-                    javaRdd: JavaRDD[T],
-                    makeGet: Function[T, Get],
-                    convertResult: Function[Result, U]): JavaRDD[U] = {
-
-    JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
-      batchSize,
-      javaRdd.rdd,
-      (t: T) => makeGet.call(t),
-      (r: Result) => {
-        convertResult.call(r)
-      })(fakeClassTag[U]))(fakeClassTag[U])
-
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMap method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generates a new DStream based on Gets and the results
-   * they bring back from HBase
-   *
-   * @param tableName     The name of the table to get from
-   * @param batchSize     The number of gets to be batched together
-   * @param javaDStream   Original DStream with data to iterate over
-   * @param makeGet       Function to convert a value in the JavaDStream to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      JavaDStream
-   * @return              New JavaDStream that is created by the Get to HBase
-   */
-  def streamBulkGet[T, U](tableName: TableName,
-                          batchSize: Integer,
-                          javaDStream: JavaDStream[T],
-                          makeGet: Function[T, Get],
-                          convertResult: Function[Result, U]): JavaDStream[U] = {
-    JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
-      batchSize,
-      javaDStream.dstream,
-      (t: T) => makeGet.call(t),
-      (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-    * A simple abstraction over the HBaseContext.bulkLoad method.
-    * It allow addition support for a user to take a JavaRDD and
-    * convert into new JavaRDD[Pair] based on MapFunction,
-    * and HFiles will be generated in stagingDir for bulk load
-    *
-    * @param javaRdd                        The javaRDD we are bulk loading from
-    * @param tableName                      The HBase table we are loading into
-    * @param mapFunc                        A Function that will convert a value in JavaRDD
-    *                                       to Pair(KeyFamilyQualifier, Array[Byte])
-    * @param stagingDir                     The location on the FileSystem to bulk load into
-    * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-    *                                       column family is written
-    * @param compactionExclude              Compaction excluded for the HFiles
-    * @param maxSize                        Max size for the HFiles before they roll
-    */
-  def bulkLoad[T](javaRdd: JavaRDD[T],
-                  tableName: TableName,
-                  mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
-                  stagingDir: String,
-                  familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
-                  compactionExclude: Boolean,
-                  maxSize: Long):
-  Unit = {
-    hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
-      val keyFamilyQualifier = t.getFirst
-      val value = t.getSecond
-      Seq((keyFamilyQualifier, value)).iterator
-    }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
-  }
-
-  /**
-    * A simple abstraction over the HBaseContext.bulkLoadThinRows method.
-    * It allow addition support for a user to take a JavaRDD and
-    * convert into new JavaRDD[Pair] based on MapFunction,
-    * and HFiles will be generated in stagingDir for bulk load
-    *
-    * @param javaRdd                        The javaRDD we are bulk loading from
-    * @param tableName                      The HBase table we are loading into
-    * @param mapFunc                        A Function that will convert a value in JavaRDD
-    *                                       to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
-    * @param stagingDir                     The location on the FileSystem to bulk load into
-    * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
-    *                                       column family is written
-    * @param compactionExclude              Compaction excluded for the HFiles
-    * @param maxSize                        Max size for the HFiles before they roll
-    */
-  def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
-                       tableName: TableName,
-                       mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
-                       stagingDir: String,
-                       familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
-                       compactionExclude: Boolean,
-                       maxSize: Long):
-  Unit = {
-    hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
-      tableName, t => {
-      (t.getFirst, t.getSecond)
-    }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
-  }
-
-  /**
-   * This function will use the native HBase TableInputFormat with the
-   * given scan object to generate a new JavaRDD
-   *
-   * @param tableName The name of the table to scan
-   * @param scans     The HBase scan object to use to read data from HBase
-   * @param f         Function to convert a Result object from HBase into
-   *                  What the user wants in the final generated JavaRDD
-   * @return          New JavaRDD with results from scan
-   */
-  def hbaseRDD[U](tableName: TableName,
-                  scans: Scan,
-                  f: Function[(ImmutableBytesWritable, Result), U]):
-  JavaRDD[U] = {
-    JavaRDD.fromRDD(
-      hbaseContext.hbaseRDD[U](tableName,
-        scans,
-        (v: (ImmutableBytesWritable, Result)) =>
-          f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
-  }
-
-  /**
-   * A overloaded version of HBaseContext hbaseRDD that define the
-   * type of the resulting JavaRDD
-   *
-   * @param tableName The name of the table to scan
-   * @param scans     The HBase scan object to use to read data from HBase
-   * @return          New JavaRDD with results from scan
-   */
-  def hbaseRDD(tableName: TableName,
-               scans: Scan):
-  JavaRDD[(ImmutableBytesWritable, Result)] = {
-    JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
-  }
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   *
-   * This method is used to keep ClassTags out of the external Java API, as the Java compiler
-   * cannot produce them automatically. While this ClassTag-faking does please the compiler,
-   * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
-   *
-   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
-   * just worse performance or security issues.
-   * For instance, an Array[AnyRef] can hold any type T,
-   * but may lose primitive
-   * specialization.
-   */
-  private[spark]
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
deleted file mode 100644
index 7fd5a62..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * This is the key to be used for sorting and shuffling.
- *
- * We will only partition on the rowKey but we will sort on all three
- *
- * @param rowKey    Record RowKey
- * @param family    Record ColumnFamily
- * @param qualifier Cell Qualifier
- */
-@InterfaceAudience.Public
-class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte])
-  extends Comparable[KeyFamilyQualifier] with Serializable {
-  override def compareTo(o: KeyFamilyQualifier): Int = {
-    var result = Bytes.compareTo(rowKey, o.rowKey)
-    if (result == 0) {
-      result = Bytes.compareTo(family, o.family)
-      if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
-    }
-    result
-  }
-  override def toString: String = {
-    Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
deleted file mode 100644
index 6d0a2d2..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.mapreduce.InputFormat
-import org.apache.spark.rdd.NewHadoopRDD
-import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
-
-@InterfaceAudience.Public
-class NewHBaseRDD[K,V](@transient sc : SparkContext,
-                       @transient inputFormatClass: Class[_ <: InputFormat[K, V]],
-                       @transient keyClass: Class[K],
-                       @transient valueClass: Class[V],
-                   @transient conf: Configuration,
-                   val hBaseContext: HBaseContext) extends NewHadoopRDD(sc,inputFormatClass, keyClass, valueClass, conf) {
-
-  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
-    hBaseContext.applyCreds()
-    super.compute(theSplit, context)
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
deleted file mode 100644
index 4602ac8..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.hbase._
-
-/**
- * The Bound represent the boudary for the scan
- *
- * @param b The byte array of the bound
- * @param inc inclusive or not.
- */
-@InterfaceAudience.Private
-case class Bound(b: Array[Byte], inc: Boolean)
-// The non-overlapping ranges we need to scan, if lower is equal to upper, it is a get request
-
-@InterfaceAudience.Private
-case class Range(lower: Option[Bound], upper: Option[Bound])
-
-@InterfaceAudience.Private
-object Range {
-  def apply(region: HBaseRegion): Range = {
-    Range(region.start.map(Bound(_, true)), if (region.end.get.size == 0) {
-      None
-    } else {
-      region.end.map((Bound(_, false)))
-    })
-  }
-}
-
-@InterfaceAudience.Private
-object Ranges {
-  // We assume that
-  // 1. r.lower.inc is true, and r.upper.inc is false
-  // 2. for each range in rs, its upper.inc is false
-  def and(r: Range, rs: Seq[Range]): Seq[Range] = {
-    rs.flatMap{ s =>
-      val lower = s.lower.map { x =>
-        // the scan has lower bound
-        r.lower.map { y =>
-          // the region has lower bound
-          if (ord.compare(x.b, y.b) < 0) {
-            // scan lower bound is smaller than region server lower bound
-            Some(y)
-          } else {
-            // scan low bound is greater or equal to region server lower bound
-            Some(x)
-          }
-        }.getOrElse(Some(x))
-      }.getOrElse(r.lower)
-
-      val upper =  s.upper.map { x =>
-        // the scan has upper bound
-        r.upper.map { y =>
-          // the region has upper bound
-          if (ord.compare(x.b, y.b) >= 0) {
-            // scan upper bound is larger than server upper bound
-            // but region server scan stop is exclusive. It is OK here.
-            Some(y)
-          } else {
-            // scan upper bound is less or equal to region server upper bound
-            Some(x)
-          }
-        }.getOrElse(Some(x))
-      }.getOrElse(r.upper)
-
-      val c = lower.map { case x =>
-        upper.map { case y =>
-          ord.compare(x.b, y.b)
-        }.getOrElse(-1)
-      }.getOrElse(-1)
-      if (c < 0) {
-        Some(Range(lower, upper))
-      } else {
-        None
-      }
-    }.seq
-  }
-}
-
-@InterfaceAudience.Private
-object Points {
-  def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = {
-    ps.flatMap { p =>
-      if (ord.compare(r.lower.get.b, p) <= 0) {
-        // if region lower bound is less or equal to the point
-        if (r.upper.isDefined) {
-          // if region upper bound is defined
-          if (ord.compare(r.upper.get.b, p) > 0) {
-            // if the upper bound is greater than the point (because upper bound is exclusive)
-            Some(p)
-          } else {
-            None
-          }
-        } else {
-          // if the region upper bound is not defined (infinity)
-          Some(p)
-        }
-      } else {
-        None
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
deleted file mode 100644
index 0f467a7..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.spark.{HBaseConnectionKey, SmartConnection,
-  HBaseConnectionCache, HBaseRelation}
-import scala.language.implicitConversions
-
-// Resource and ReferencedResources are defined for extensibility,
-// e.g., consolidate scan and bulkGet in the future work.
-
-// User has to invoke release explicitly to release the resource,
-// and potentially parent resources
-@InterfaceAudience.Private
-trait Resource {
-  def release(): Unit
-}
-
-@InterfaceAudience.Private
-case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource {
-  def release() {
-    rs.close()
-    tbr.release()
-  }
-}
-
-@InterfaceAudience.Private
-case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource {
-  def release() {
-    tbr.release()
-  }
-}
-
-@InterfaceAudience.Private
-trait ReferencedResource {
-  var count: Int = 0
-  def init(): Unit
-  def destroy(): Unit
-  def acquire() = synchronized {
-    try {
-      count += 1
-      if (count == 1) {
-        init()
-      }
-    } catch {
-      case e: Throwable =>
-        release()
-        throw e
-    }
-  }
-
-  def release() = synchronized {
-    count -= 1
-    if (count == 0) {
-      destroy()
-    }
-  }
-
-  def releaseOnException[T](func: => T): T = {
-    acquire()
-    val ret = {
-      try {
-        func
-      } catch {
-        case e: Throwable =>
-          release()
-          throw e
-      }
-    }
-    ret
-  }
-}
-
-@InterfaceAudience.Private
-case class TableResource(relation: HBaseRelation) extends ReferencedResource {
-  var connection: SmartConnection = _
-  var table: Table = _
-
-  override def init(): Unit = {
-    connection = HBaseConnectionCache.getConnection(relation.hbaseConf)
-    table = connection.getTable(TableName.valueOf(relation.tableName))
-  }
-
-  override def destroy(): Unit = {
-    if (table != null) {
-      table.close()
-      table = null
-    }
-    if (connection != null) {
-      connection.close()
-      connection = null
-    }
-  }
-
-  def getScanner(scan: Scan): ScanResource = releaseOnException {
-    ScanResource(this, table.getScanner(scan))
-  }
-
-  def get(list: java.util.List[org.apache.hadoop.hbase.client.Get]) = releaseOnException {
-    GetResource(this, table.get(list))
-  }
-}
-
-@InterfaceAudience.Private
-case class RegionResource(relation: HBaseRelation) extends ReferencedResource {
-  var connection: SmartConnection = _
-  var rl: RegionLocator = _
-  val regions = releaseOnException {
-    val keys = rl.getStartEndKeys
-    keys.getFirst.zip(keys.getSecond)
-      .zipWithIndex
-      .map(x =>
-      HBaseRegion(x._2,
-        Some(x._1._1),
-        Some(x._1._2),
-        Some(rl.getRegionLocation(x._1._1).getHostname)))
-  }
-
-  override def init(): Unit = {
-    connection = HBaseConnectionCache.getConnection(relation.hbaseConf)
-    rl = connection.getRegionLocator(TableName.valueOf(relation.tableName))
-  }
-
-  override def destroy(): Unit = {
-    if (rl != null) {
-      rl.close()
-      rl = null
-    }
-    if (connection != null) {
-      connection.close()
-      connection = null
-    }
-  }
-}
-
-@InterfaceAudience.Private
-object HBaseResources{
-  implicit def ScanResToScan(sr: ScanResource): ResultScanner = {
-    sr.rs
-  }
-
-  implicit def GetResToResult(gr: GetResource): Array[Result] = {
-    gr.rs
-  }
-
-  implicit def TableResToTable(tr: TableResource): Table = {
-    tr.table
-  }
-
-  implicit def RegionResToRegions(rr: RegionResource): Seq[HBaseRegion] = {
-    rr.regions
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
deleted file mode 100644
index dc497f9..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the hbase configuration. User can either set them in SparkConf, which
- * will take effect globally, or configure it per table, which will overwrite the value
- * set in SparkConf. If not set, the default value will take effect.
- */
-@InterfaceAudience.Public
-object HBaseSparkConf{
-  /** Set to false to disable server-side caching of blocks for this scan,
-   *  false by default, since full table scans generate too much BC churn.
-   */
-  val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
-  val DEFAULT_QUERY_CACHEBLOCKS = false
-  /** The number of rows for caching that will be passed to scan. */
-  val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
-  /** Set the maximum number of values to return for each call to next() in scan. */
-  val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
-  /** The number of BulkGets send to HBase. */
-  val BULKGET_SIZE = "hbase.spark.bulkget.size"
-  val DEFAULT_BULKGET_SIZE = 1000
-  /** Set to specify the location of hbase configuration file. */
-  val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
-  /** Set to specify whether create or use latest cached HBaseContext*/
-  val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
-  val DEFAULT_USE_HBASECONTEXT = true
-  /** Pushdown the filter to data source engine to increase the performance of queries. */
-  val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
-  val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
-  /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
-  val QUERY_ENCODER = "hbase.spark.query.encoder"
-  val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
-  /** The timestamp used to filter columns with a specific timestamp. */
-  val TIMESTAMP = "hbase.spark.query.timestamp"
-  /** The starting timestamp used to filter columns with a specific range of versions. */
-  val TIMERANGE_START = "hbase.spark.query.timerange.start"
-  /** The ending timestamp used to filter columns with a specific range of versions. */
-  val TIMERANGE_END =  "hbase.spark.query.timerange.end"
-  /** The maximum number of version to return. */
-  val MAX_VERSIONS = "hbase.spark.query.maxVersions"
-  /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
-  val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
deleted file mode 100644
index 1ca1b45..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import java.util.ArrayList
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.spark._
-import org.apache.hadoop.hbase.spark.hbase._
-import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
-import org.apache.hadoop.hbase.util.ShutdownHookManager
-import org.apache.spark.sql.datasources.hbase.Field
-import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition}
-import org.apache.spark.rdd.RDD
-
-import scala.collection.mutable
-
-@InterfaceAudience.Private
-class HBaseTableScanRDD(relation: HBaseRelation,
-                       val hbaseContext: HBaseContext,
-                       @transient val filter: Option[SparkSQLPushDownFilter] = None,
-                        val columns: Seq[Field] = Seq.empty
-     )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging  {
-  private def sparkConf = SparkEnv.get.conf
-  @transient var ranges = Seq.empty[Range]
-  @transient var points = Seq.empty[Array[Byte]]
-  def addPoint(p: Array[Byte]) {
-    points :+= p
-  }
-
-  def addRange(r: ScanRange) = {
-    val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
-      Some(Bound(r.lowerBound, r.isLowerBoundEqualTo))
-    } else {
-      None
-    }
-    val upper = if (r.upperBound != null && r.upperBound.length > 0) {
-      if (!r.isUpperBoundEqualTo) {
-        Some(Bound(r.upperBound, false))
-      } else {
-
-        // HBase stopRow is exclusive: therefore it DOESN'T act like isUpperBoundEqualTo
-        // by default.  So we need to add a new max byte to the stopRow key
-        val newArray = new Array[Byte](r.upperBound.length + 1)
-        System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length)
-
-        //New Max Bytes
-        newArray(r.upperBound.length) = ByteMin
-        Some(Bound(newArray, false))
-      }
-    } else {
-      None
-    }
-    ranges :+= Range(lower, upper)
-  }
-
-  override def getPartitions: Array[Partition] = {
-    val regions = RegionResource(relation)
-    var idx = 0
-    logDebug(s"There are ${regions.size} regions")
-    val ps = regions.flatMap { x =>
-      val rs = Ranges.and(Range(x), ranges)
-      val ps = Points.and(Range(x), points)
-      if (rs.size > 0 || ps.size > 0) {
-        if(log.isDebugEnabled) {
-          rs.foreach(x => logDebug(x.toString))
-        }
-        idx += 1
-        Some(HBaseScanPartition(idx - 1, x, rs, ps, SerializedFilter.toSerializedTypedFilter(filter)))
-      } else {
-        None
-      }
-    }.toArray
-    regions.release()
-    ShutdownHookManager.affixShutdownHook( new Thread() {
-      override def run() {
-        HBaseConnectionCache.close()
-      }
-    }, 0)
-    ps.asInstanceOf[Array[Partition]]
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[HBaseScanPartition].regions.server.map {
-      identity
-    }.toSeq
-  }
-
-  private def buildGets(
-      tbr: TableResource,
-      g: Seq[Array[Byte]],
-      filter: Option[SparkSQLPushDownFilter],
-      columns: Seq[Field],
-      hbaseContext: HBaseContext): Iterator[Result] = {
-    g.grouped(relation.bulkGetSize).flatMap{ x =>
-      val gets = new ArrayList[Get](x.size)
-      x.foreach{ y =>
-        val g = new Get(y)
-        handleTimeSemantics(g)
-        columns.foreach { d =>
-          if (!d.isRowKey) {
-            g.addColumn(d.cfBytes, d.colBytes)
-          }
-        }
-        filter.foreach(g.setFilter(_))
-        gets.add(g)
-      }
-      hbaseContext.applyCreds()
-      val tmp = tbr.get(gets)
-      rddResources.addResource(tmp)
-      toResultIterator(tmp)
-    }
-  }
-
-  private def toResultIterator(result: GetResource): Iterator[Result] = {
-    val iterator = new Iterator[Result] {
-      var idx = 0
-      var cur: Option[Result] = None
-      override def hasNext: Boolean = {
-        while(idx < result.length && cur.isEmpty) {
-          val r = result(idx)
-          idx += 1
-          if (!r.isEmpty) {
-            cur = Some(r)
-          }
-        }
-        if (cur.isEmpty) {
-          rddResources.release(result)
-        }
-        cur.isDefined
-      }
-      override def next(): Result = {
-        hasNext
-        val ret = cur.get
-        cur = None
-        ret
-      }
-    }
-    iterator
-  }
-
-  private def buildScan(range: Range,
-      filter: Option[SparkSQLPushDownFilter],
-      columns: Seq[Field]): Scan = {
-    val scan = (range.lower, range.upper) match {
-      case (Some(Bound(a, b)), Some(Bound(c, d))) => new Scan(a, c)
-      case (None, Some(Bound(c, d))) => new Scan(Array[Byte](), c)
-      case (Some(Bound(a, b)), None) => new Scan(a)
-      case (None, None) => new Scan()
-    }
-    handleTimeSemantics(scan)
-
-    columns.foreach { d =>
-      if (!d.isRowKey) {
-        scan.addColumn(d.cfBytes, d.colBytes)
-      }
-    }
-    scan.setCacheBlocks(relation.blockCacheEnable)
-    scan.setBatch(relation.batchNum)
-    scan.setCaching(relation.cacheSize)
-    filter.foreach(scan.setFilter(_))
-    scan
-  }
-  private def toResultIterator(scanner: ScanResource): Iterator[Result] = {
-    val iterator = new Iterator[Result] {
-      var cur: Option[Result] = None
-      override def hasNext: Boolean = {
-        if (cur.isEmpty) {
-          val r = scanner.next()
-          if (r == null) {
-            rddResources.release(scanner)
-          } else {
-            cur = Some(r)
-          }
-        }
-        cur.isDefined
-      }
-      override def next(): Result = {
-        hasNext
-        val ret = cur.get
-        cur = None
-        ret
-      }
-    }
-    iterator
-  }
-
-  lazy val rddResources = RDDResources(new mutable.HashSet[Resource]())
-
-  private def close() {
-    rddResources.release()
-  }
-
-  override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
-    val partition = split.asInstanceOf[HBaseScanPartition]
-    val filter = SerializedFilter.fromSerializedFilter(partition.sf)
-    val scans = partition.scanRanges
-      .map(buildScan(_, filter, columns))
-    val tableResource = TableResource(relation)
-    context.addTaskCompletionListener(context => close())
-    val points = partition.points
-    val gIt: Iterator[Result] =  {
-      if (points.isEmpty) {
-        Iterator.empty: Iterator[Result]
-      } else {
-        buildGets(tableResource, points, filter, columns, hbaseContext)
-      }
-    }
-    val rIts = scans.par
-      .map { scan =>
-      hbaseContext.applyCreds()
-      val scanner = tableResource.getScanner(scan)
-      rddResources.addResource(scanner)
-      scanner
-    }.map(toResultIterator(_))
-      .fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
-      x ++ y
-    } ++ gIt
-    ShutdownHookManager.affixShutdownHook( new Thread() {
-      override def run() {
-        HBaseConnectionCache.close()
-      }
-    }, 0)
-    rIts
-  }
-
-  private def handleTimeSemantics(query: Query): Unit = {
-    // Set timestamp related values if present
-    (query, relation.timestamp, relation.minTimestamp, relation.maxTimestamp)  match {
-      case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
-      case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
-
-      case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
-      case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
-
-      case (q, None, None, None) =>
-
-      case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " +
-        s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimestamp.get}, " +
-        s"maxTimeStamp is: ${relation.maxTimestamp.get}")
-    }
-    if (relation.maxVersions.isDefined) {
-      query match {
-        case q: Scan => q.setMaxVersions(relation.maxVersions.get)
-        case q: Get => q.setMaxVersions(relation.maxVersions.get)
-        case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions")
-      }
-    }
-  }
-}
-
-case class SerializedFilter(b: Option[Array[Byte]])
-
-object SerializedFilter {
-  def toSerializedTypedFilter(f: Option[SparkSQLPushDownFilter]): SerializedFilter = {
-    SerializedFilter(f.map(_.toByteArray))
-  }
-
-  def fromSerializedFilter(sf: SerializedFilter): Option[SparkSQLPushDownFilter] = {
-    sf.b.map(SparkSQLPushDownFilter.parseFrom(_))
-  }
-}
-
-private[hbase] case class HBaseRegion(
-    override val index: Int,
-    val start: Option[HBaseType] = None,
-    val end: Option[HBaseType] = None,
-    val server: Option[String] = None) extends Partition
-
-
-private[hbase] case class HBaseScanPartition(
-    override val index: Int,
-    val regions: HBaseRegion,
-    val scanRanges: Seq[Range],
-    val points: Seq[Array[Byte]],
-    val sf: SerializedFilter) extends Partition
-
-case class RDDResources(set: mutable.HashSet[Resource]) {
-  def addResource(s: Resource) {
-    set += s
-  }
-  def release() {
-    set.foreach(release(_))
-  }
-  def release(rs: Resource) {
-    try {
-      rs.release()
-    } finally {
-      set.remove(rs)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
deleted file mode 100644
index 6a50189..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Logging
-import org.apache.spark.sql.types._
-
-/**
-  * The ranges for the data type whose size is known. Whether the bound is inclusive
-  * or exclusive is undefind, and upper to the caller to decide.
-  *
-  * @param low: the lower bound of the range.
-  * @param upper: the upper bound of the range.
-  */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-case class BoundRange(low: Array[Byte],upper: Array[Byte])
-
-/**
-  * The class identifies the ranges for a java primitive type. The caller needs
-  * to decide the bound is either inclusive or exclusive on its own.
-  * information
-  *
-  * @param less: the set of ranges for LessThan/LessOrEqualThan
-  * @param greater: the set of ranges for GreaterThan/GreaterThanOrEqualTo
-  * @param value: the byte array of the original value
-  */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-case class BoundRanges(less: Array[BoundRange], greater: Array[BoundRange], value: Array[Byte])
-
-/**
-  * The trait to support plugin architecture for different encoder/decoder.
-  * encode is used for serializing the data type to byte array and the filter is
-  * used to filter out the unnecessary records.
-  */
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-trait BytesEncoder {
-  def encode(dt: DataType, value: Any): Array[Byte]
-
-  /**
-    * The function performing real filtering operations. The format of filterBytes depends on the
-    * implementation of the BytesEncoder.
-    *
-    * @param input: the current input byte array that needs to be filtered out
-    * @param offset1: the starting offset of the input byte array.
-    * @param length1: the length of the input byte array.
-    * @param filterBytes: the byte array provided by query condition.
-    * @param offset2: the starting offset in the filterBytes.
-    * @param length2: the length of the bytes in the filterBytes
-    * @param ops: The operation of the filter operator.
-    * @return true: the record satisfies the predicates
-    *         false: the record does not satisfy the predicates.
-    */
-  def filter(input: Array[Byte], offset1: Int, length1: Int,
-             filterBytes: Array[Byte], offset2: Int, length2: Int,
-             ops: JavaBytesEncoder): Boolean
-
-  /**
-    * Currently, it is used for partition pruning.
-    * As for some codec, the order may be inconsistent between java primitive
-    * type and its byte array. We may have to  split the predicates on some
-    * of the java primitive type into multiple predicates.
-    *
-    * For example in naive codec,  some of the java primitive types have to be
-    * split into multiple predicates, and union these predicates together to
-    * make the predicates be performed correctly.
-    * For example, if we have "COLUMN < 2", we will transform it into
-    * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
-    */
-  def ranges(in: Any): Option[BoundRanges]
-}
-
-@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK))
-@InterfaceStability.Evolving
-object JavaBytesEncoder extends Enumeration with Logging{
-  type JavaBytesEncoder = Value
-  val Greater, GreaterEqual, Less, LessEqual, Equal, Unknown = Value
-
-  /**
-    * create the encoder/decoder
-    *
-    * @param clsName: the class name of the encoder/decoder class
-    * @return the instance of the encoder plugin.
-    */
-  def create(clsName: String): BytesEncoder = {
-    try {
-      Class.forName(clsName).newInstance.asInstanceOf[BytesEncoder]
-    } catch {
-      case _: Throwable =>
-        logWarning(s"$clsName cannot be initiated, falling back to naive encoder")
-        new NaiveEncoder()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
deleted file mode 100644
index 6138242..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-package org.apache.hadoop.hbase.spark.datasources
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
-import org.apache.hadoop.hbase.spark.hbase._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Logging
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-/**
-  * This is the naive non-order preserving encoder/decoder.
-  * Due to the inconsistency of the order between java primitive types
-  * and their bytearray. The data type has to be passed in so that the filter
-  * can work correctly, which is done by wrapping the type into the first byte
-  * of the serialized array.
-  */
-@InterfaceAudience.Private
-class NaiveEncoder extends BytesEncoder with Logging{
-  var code = 0
-  def nextCode: Byte = {
-    code += 1
-    (code - 1).asInstanceOf[Byte]
-  }
-  val BooleanEnc = nextCode
-  val ShortEnc = nextCode
-  val IntEnc = nextCode
-  val LongEnc = nextCode
-  val FloatEnc = nextCode
-  val DoubleEnc = nextCode
-  val StringEnc = nextCode
-  val BinaryEnc = nextCode
-  val TimestampEnc = nextCode
-  val UnknownEnc = nextCode
-
-
-  /**
-    * Evaluate the java primitive type and return the BoundRanges. For one value, it may have
-    * multiple output ranges because of the inconsistency of order between java primitive type
-    * and its byte array order.
-    *
-    * For short, integer, and long, the order of number is consistent with byte array order
-    * if two number has the same sign bit. But the negative number is larger than positive
-    * number in byte array.
-    *
-    * For double and float, the order of positive number is consistent with its byte array order.
-    * But the order of negative number is the reverse order of byte array. Please refer to IEEE-754
-    * and https://en.wikipedia.org/wiki/Single-precision_floating-point_format
-    */
-  def ranges(in: Any): Option[BoundRanges] = in match {
-    case a: Integer =>
-      val b =  Bytes.toBytes(a)
-      if (a >= 0) {
-        logDebug(s"range is 0 to $a and ${Integer.MIN_VALUE} to -1")
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(0: Int), b),
-            BoundRange(Bytes.toBytes(Integer.MIN_VALUE),  Bytes.toBytes(-1: Int))),
-          Array(BoundRange(b,  Bytes.toBytes(Integer.MAX_VALUE))), b))
-      } else {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(Integer.MIN_VALUE), b)),
-          Array(BoundRange(b, Bytes.toBytes(-1: Integer)),
-            BoundRange(Bytes.toBytes(0: Int), Bytes.toBytes(Integer.MAX_VALUE))), b))
-      }
-    case a: Long =>
-      val b =  Bytes.toBytes(a)
-      if (a >= 0) {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(0: Long), b),
-            BoundRange(Bytes.toBytes(Long.MinValue),  Bytes.toBytes(-1: Long))),
-          Array(BoundRange(b,  Bytes.toBytes(Long.MaxValue))), b))
-      } else {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(Long.MinValue), b)),
-          Array(BoundRange(b, Bytes.toBytes(-1: Long)),
-            BoundRange(Bytes.toBytes(0: Long), Bytes.toBytes(Long.MaxValue))), b))
-      }
-    case a: Short =>
-      val b =  Bytes.toBytes(a)
-      if (a >= 0) {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(0: Short), b),
-            BoundRange(Bytes.toBytes(Short.MinValue),  Bytes.toBytes(-1: Short))),
-          Array(BoundRange(b,  Bytes.toBytes(Short.MaxValue))), b))
-      } else {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(Short.MinValue), b)),
-          Array(BoundRange(b, Bytes.toBytes(-1: Short)),
-            BoundRange(Bytes.toBytes(0: Short), Bytes.toBytes(Short.MaxValue))), b))
-      }
-    case a: Double =>
-      val b =  Bytes.toBytes(a)
-      if (a >= 0.0f) {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(0.0d), b),
-            BoundRange(Bytes.toBytes(-0.0d),  Bytes.toBytes(Double.MinValue))),
-          Array(BoundRange(b,  Bytes.toBytes(Double.MaxValue))), b))
-      } else {
-        Some(BoundRanges(
-          Array(BoundRange(b, Bytes.toBytes(Double.MinValue))),
-          Array(BoundRange(Bytes.toBytes(-0.0d), b),
-            BoundRange(Bytes.toBytes(0.0d), Bytes.toBytes(Double.MaxValue))), b))
-      }
-    case a: Float =>
-      val b =  Bytes.toBytes(a)
-      if (a >= 0.0f) {
-        Some(BoundRanges(
-          Array(BoundRange(Bytes.toBytes(0.0f), b),
-            BoundRange(Bytes.toBytes(-0.0f),  Bytes.toBytes(Float.MinValue))),
-          Array(BoundRange(b,  Bytes.toBytes(Float.MaxValue))), b))
-      } else {
-        Some(BoundRanges(
-          Array(BoundRange(b, Bytes.toBytes(Float.MinValue))),
-          Array(BoundRange(Bytes.toBytes(-0.0f), b),
-            BoundRange(Bytes.toBytes(0.0f), Bytes.toBytes(Float.MaxValue))), b))
-      }
-    case a: Array[Byte] =>
-      Some(BoundRanges(
-        Array(BoundRange(bytesMin, a)),
-        Array(BoundRange(a, bytesMax)), a))
-    case a: Byte =>
-      val b =  Array(a)
-      Some(BoundRanges(
-        Array(BoundRange(bytesMin, b)),
-        Array(BoundRange(b, bytesMax)), b))
-    case a: String =>
-      val b =  Bytes.toBytes(a)
-      Some(BoundRanges(
-        Array(BoundRange(bytesMin, b)),
-        Array(BoundRange(b, bytesMax)), b))
-    case a: UTF8String =>
-      val b = a.getBytes
-      Some(BoundRanges(
-        Array(BoundRange(bytesMin, b)),
-        Array(BoundRange(b, bytesMax)), b))
-    case _ => None
-  }
-
-  def compare(c: Int, ops: JavaBytesEncoder): Boolean = {
-    ops match {
-      case JavaBytesEncoder.Greater =>  c > 0
-      case JavaBytesEncoder.GreaterEqual =>  c >= 0
-      case JavaBytesEncoder.Less =>  c < 0
-      case JavaBytesEncoder.LessEqual =>  c <= 0
-    }
-  }
-
-  /**
-    * encode the data type into byte array. Note that it is a naive implementation with the
-    * data type byte appending to the head of the serialized byte array.
-    *
-    * @param dt: The data type of the input
-    * @param value: the value of the input
-    * @return the byte array with the first byte indicating the data type.
-    */
-  override def encode(dt: DataType,
-                      value: Any): Array[Byte] = {
-    dt match {
-      case BooleanType =>
-        val result = new Array[Byte](Bytes.SIZEOF_BOOLEAN + 1)
-        result(0) = BooleanEnc
-        value.asInstanceOf[Boolean] match {
-          case true => result(1) = -1: Byte
-          case false => result(1) = 0: Byte
-        }
-        result
-      case ShortType =>
-        val result = new Array[Byte](Bytes.SIZEOF_SHORT + 1)
-        result(0) = ShortEnc
-        Bytes.putShort(result, 1, value.asInstanceOf[Short])
-        result
-      case IntegerType =>
-        val result = new Array[Byte](Bytes.SIZEOF_INT + 1)
-        result(0) = IntEnc
-        Bytes.putInt(result, 1, value.asInstanceOf[Int])
-        result
-      case LongType|TimestampType =>
-        val result = new Array[Byte](Bytes.SIZEOF_LONG + 1)
-        result(0) = LongEnc
-        Bytes.putLong(result, 1, value.asInstanceOf[Long])
-        result
-      case FloatType =>
-        val result = new Array[Byte](Bytes.SIZEOF_FLOAT + 1)
-        result(0) = FloatEnc
-        Bytes.putFloat(result, 1, value.asInstanceOf[Float])
-        result
-      case DoubleType =>
-        val result = new Array[Byte](Bytes.SIZEOF_DOUBLE + 1)
-        result(0) = DoubleEnc
-        Bytes.putDouble(result, 1, value.asInstanceOf[Double])
-        result
-      case BinaryType =>
-        val v = value.asInstanceOf[Array[Bytes]]
-        val result = new Array[Byte](v.length + 1)
-        result(0) = BinaryEnc
-        System.arraycopy(v, 0, result, 1, v.length)
-        result
-      case StringType =>
-        val bytes = Bytes.toBytes(value.asInstanceOf[String])
-        val result = new Array[Byte](bytes.length + 1)
-        result(0) = StringEnc
-        System.arraycopy(bytes, 0, result, 1, bytes.length)
-        result
-      case _ =>
-        val bytes = Bytes.toBytes(value.toString)
-        val result = new Array[Byte](bytes.length + 1)
-        result(0) = UnknownEnc
-        System.arraycopy(bytes, 0, result, 1, bytes.length)
-        result
-    }
-  }
-
-  override def filter(input: Array[Byte], offset1: Int, length1: Int,
-                      filterBytes: Array[Byte], offset2: Int, length2: Int,
-                      ops: JavaBytesEncoder): Boolean = {
-    filterBytes(offset2) match {
-      case ShortEnc =>
-        val in = Bytes.toShort(input, offset1)
-        val value = Bytes.toShort(filterBytes, offset2 + 1)
-        compare(in.compareTo(value), ops)
-      case IntEnc =>
-        val in = Bytes.toInt(input, offset1)
-        val value = Bytes.toInt(filterBytes, offset2 + 1)
-        compare(in.compareTo(value), ops)
-      case LongEnc | TimestampEnc =>
-        val in = Bytes.toInt(input, offset1)
-        val value = Bytes.toInt(filterBytes, offset2 + 1)
-        compare(in.compareTo(value), ops)
-      case FloatEnc =>
-        val in = Bytes.toFloat(input, offset1)
-        val value = Bytes.toFloat(filterBytes, offset2 + 1)
-        compare(in.compareTo(value), ops)
-      case DoubleEnc =>
-        val in = Bytes.toDouble(input, offset1)
-        val value = Bytes.toDouble(filterBytes, offset2 + 1)
-        compare(in.compareTo(value), ops)
-      case _ =>
-        // for String, Byte, Binary, Boolean and other types
-        // we can use the order of byte array directly.
-        compare(
-          Bytes.compareTo(input, offset1, length1, filterBytes, offset2 + 1, length2 - 1), ops)
-    }
-  }
-}