You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/08/12 17:29:14 UTC

hbase git commit: HBASE-14150 Add BulkLoad functionality to HBase-Spark Module (Ted Malaska)

Repository: hbase
Updated Branches:
  refs/heads/master aa3538f80 -> 72a48a133


HBASE-14150 Add BulkLoad functionality to HBase-Spark Module (Ted Malaska)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72a48a13
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72a48a13
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72a48a13

Branch: refs/heads/master
Commit: 72a48a1333f6c01c46cd244439198ccce3f941ac
Parents: aa3538f
Author: tedyu <yu...@gmail.com>
Authored: Wed Aug 12 08:29:03 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Aug 12 08:29:03 2015 -0700

----------------------------------------------------------------------
 .../hbase/spark/BulkLoadPartitioner.scala       |  56 ++
 .../hbase/spark/FamilyHFileWriteOptions.scala   |  35 ++
 .../hadoop/hbase/spark/HBaseContext.scala       | 294 +++++++++-
 .../hadoop/hbase/spark/HBaseRDDFunctions.scala  |  47 +-
 .../hadoop/hbase/spark/KeyFamilyQualifier.scala |  46 ++
 .../hadoop/hbase/spark/BulkLoadSuite.scala      | 537 +++++++++++++++++++
 6 files changed, 1002 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
new file mode 100644
index 0000000..c51a3af
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+import java.util.Comparator
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Partitioner
+
+/**
+ * A Partitioner implementation that will separate records to different
+ * HBase Regions based on region splits
+ *
+ * @param startKeys   The start keys for the given table
+ */
+class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
+  extends Partitioner {
+
+  override def numPartitions: Int = startKeys.length
+
+  override def getPartition(key: Any): Int = {
+
+    val rowKey:Array[Byte] =
+      key match {
+        case qualifier: KeyFamilyQualifier =>
+          qualifier.rowKey
+        case _ =>
+          key.asInstanceOf[Array[Byte]]
+      }
+
+    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
+      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
+        Bytes.compareTo(o1, o2)
+      }
+    }
+    val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+    if (partition < 0) partition * -1 + -2
+    else partition
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
new file mode 100644
index 0000000..7dbe140
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.io.Serializable
+
+/**
+ * This object will hold optional data for how a given column family's
+ * writer will work
+ *
+ * @param compression       String to define the Compression to be used in the HFile
+ * @param bloomType         String to define the bloom type to be used in the HFile
+ * @param blockSize         The block size to be used in the HFile
+ * @param dataBlockEncoding String to define the data block encoding to be used
+ *                          in the HFile
+ */
+class FamilyHFileWriteOptions( val compression:String,
+                               val bloomType: String,
+                               val blockSize: Int,
+                               val dataBlockEncoding: String) extends Serializable

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index f060fea..9d14e22 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -17,31 +17,35 @@
 
 package org.apache.hadoop.hbase.spark
 
-import org.apache.hadoop.hbase.TableName
+import java.net.InetSocketAddress
+import java.util
+
+import org.apache.hadoop.hbase.fs.HFileSystem
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
+import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
+import org.apache.hadoop.hbase.util.Bytes
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.ConnectionFactory
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.client.Get
-import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.client._
 import scala.reflect.ClassTag
-import org.apache.hadoop.hbase.client.Connection
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.client.Delete
 import org.apache.spark.{Logging, SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
+TableInputFormat, IdentityTableMapper}
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.hbase.client.Mutation
 import org.apache.spark.streaming.dstream.DStream
 import java.io._
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat
-import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
 import org.apache.hadoop.fs.{Path, FileSystem}
+import scala.collection.mutable
 
 /**
   * HBaseContext is a façade for HBase operations
@@ -567,4 +571,270 @@ class HBaseContext(@transient sc: SparkContext,
    */
   private[spark]
   def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+  /**
+   * A Spark Implementation of HBase Bulk load
+   *
+   * This will take the content from an existing RDD then sort and shuffle
+   * it with respect to region splits.  The result of that sort and shuffle
+   * will be written to HFiles.
+   *
+   * After this function is executed the user will have to call
+   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+   *
+   * Also note this version of bulk load is different from past versions in
+   * that it includes the qualifier as part of the sort process. The
+   * reason for this is to be able to support rows will very large number
+   * of columns.
+   *
+   * @param rdd                            The RDD we are bulk loading from
+   * @param tableName                      The HBase table we are loading into
+   * @param flatMap                        A flapMap function that will make every
+   *                                       row in the RDD
+   *                                       into N cells for the bulk load
+   * @param stagingDir                     The location on the FileSystem to bulk load into
+   * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
+   *                                       column family is written
+   * @param compactionExclude              Compaction excluded for the HFiles
+   * @param maxSize                        Max size for the HFiles before they roll
+   * @tparam T                             The Type of values in the original RDD
+   */
+  def bulkLoad[T](rdd:RDD[T],
+                  tableName: TableName,
+                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+                  stagingDir:String,
+                  familyHFileWriteOptionsMap:
+                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+                  compactionExclude: Boolean = false,
+                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+  Unit = {
+    val conn = ConnectionFactory.createConnection(config)
+    val regionLocator = conn.getRegionLocator(tableName)
+    val startKeys = regionLocator.getStartKeys
+    val defaultCompressionStr = config.get("hfile.compression",
+      Compression.Algorithm.NONE.getName)
+    val defaultCompression = HFileWriterImpl
+      .compressionByName(defaultCompressionStr)
+    val now = System.currentTimeMillis()
+    val tableNameByteArray = tableName.getName
+
+    val familyHFileWriteOptionsMapInternal =
+      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+
+    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+
+    while (entrySetIt.hasNext) {
+      val entry = entrySetIt.next()
+      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+    }
+
+    /**
+     *  This will return a new HFile writer when requested
+     *
+     * @param family       column family
+     * @param conf         configuration to connect to HBase
+     * @param favoredNodes nodes that we would like to write too
+     * @param fs           FileSystem object where we will be writing the HFiles to
+     * @return WriterLength object
+     */
+    def getNewWriter(family: Array[Byte], conf: Configuration,
+                     favoredNodes: Array[InetSocketAddress],
+                     fs:FileSystem,
+                     familydir:Path): WriterLength = {
+
+
+      var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
+
+      if (familyOptions == null) {
+        familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
+          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
+        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
+      }
+
+      val tempConf = new Configuration(conf)
+      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+      val contextBuilder = new HFileContextBuilder()
+        .withCompression(Algorithm.valueOf(familyOptions.compression))
+        .withChecksumType(HStore.getChecksumType(conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+        .withBlockSize(familyOptions.blockSize)
+      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
+        valueOf(familyOptions.dataBlockEncoding))
+      val hFileContext = contextBuilder.build()
+
+      if (null == favoredNodes) {
+        new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+          .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+          .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
+      } else {
+        new WriterLength(0,
+          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+          .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+          .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+          .withFavoredNodes(favoredNodes).build())
+      }
+    }
+
+    val regionSplitPartitioner =
+      new BulkLoadPartitioner(startKeys)
+
+    //This is where all the magic happens
+    //Here we are going to do the following things
+    // 1. FlapMap every row in the RDD into key column value tuples
+    // 2. Then we are going to repartition sort and shuffle
+    // 3. Finally we are going to write out our HFiles
+    rdd.flatMap( r => flatMap(r)).
+      repartitionAndSortWithinPartitions(regionSplitPartitioner).
+      hbaseForeachPartition(this, (it, conn) => {
+
+      val conf = broadcastedConf.value.value
+      val fs = FileSystem.get(conf)
+      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+      var rollOverRequested = false
+
+      /**
+       * This will roll all writers
+       */
+      def rollWriters(): Unit = {
+        writerMap.values.foreach( wl => {
+          if (wl.writer != null) {
+            logDebug("Writer=" + wl.writer.getPath +
+              (if (wl.written == 0) "" else ", wrote=" + wl.written))
+            close(wl.writer)
+          }
+        })
+        writerMap.clear()
+        rollOverRequested = false
+      }
+
+      /**
+       * This function will close a given HFile writer
+       * @param w The writer to close
+       */
+      def close(w:StoreFile.Writer): Unit = {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+            Bytes.toBytes(System.currentTimeMillis()))
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+            Bytes.toBytes(true))
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude))
+          w.appendTrackedTimestampsToMetadata()
+          w.close()
+        }
+      }
+
+      //Here is where we finally iterate through the data in this partition of the
+      //RDD that has been sorted and partitioned
+      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+        //This will get a writer for the column family
+        //If there is no writer for a given column family then
+        //it will get created here.
+        val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {
+
+          val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))
+
+          fs.mkdirs(familyDir)
+
+          val loc:HRegionLocation = {
+            try {
+              val locator =
+                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
+              locator.getRegionLocation(keyFamilyQualifier.rowKey)
+            } catch {
+              case e: Throwable =>
+              logWarning("there's something wrong when locating rowkey: " +
+                Bytes.toString(keyFamilyQualifier.rowKey))
+                null
+            }
+          }
+          if (null == loc) {
+            if (log.isTraceEnabled) {
+              logTrace("failed to get region location, so use default writer: " +
+                Bytes.toString(keyFamilyQualifier.rowKey))
+            }
+            getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
+              fs = fs, familydir = familyDir)
+          } else {
+            if (log.isDebugEnabled) {
+              logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
+            }
+            val initialIsa =
+              new InetSocketAddress(loc.getHostname, loc.getPort)
+            if (initialIsa.isUnresolved) {
+              if (log.isTraceEnabled) {
+                logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+                  + loc.getPort + ", so use default writer")
+              }
+              getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
+            } else {
+              if(log.isDebugEnabled) {
+                logDebug("use favored nodes writer: " + initialIsa.getHostString)
+              }
+              getNewWriter(keyFamilyQualifier.family, conf,
+                Array[InetSocketAddress](initialIsa), fs, familyDir)
+            }
+          }
+        })
+
+        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
+          keyFamilyQualifier.family,
+          keyFamilyQualifier.qualifier,
+          now,cellValue)
+
+        wl.writer.append(keyValue)
+        wl.written += keyValue.getLength
+
+        rollOverRequested = rollOverRequested || wl.written > maxSize
+
+        //This will only roll if we have at least one column family file that is
+        //bigger then maxSize and we have finished a given row key
+        if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+          rollWriters()
+        }
+
+        previousRow = keyFamilyQualifier.rowKey
+      }
+      //We have finished all the data so lets close up the writers
+      rollWriters()
+    })
+  }
+
+  /**
+   * This is a wrapper class around StoreFile.Writer.  The reason for the
+   * wrapper is to keep the length of the file along side the writer
+   *
+   * @param written The writer to be wrapped
+   * @param writer  The number of bytes written to the writer
+   */
+  class WriterLength(var written:Long, val writer:StoreFile.Writer)
+
+  /**
+   * This is a wrapper over a byte array so it can work as
+   * a key in a hashMap
+   *
+   * @param o1 The Byte Array value
+   */
+  class ByteArrayWrapper (val o1:Array[Byte])
+    extends Comparable[ByteArrayWrapper] with Serializable {
+    override def compareTo(o2: ByteArrayWrapper): Int = {
+      Bytes.compareTo(o1,o2.o1)
+    }
+    override def equals(o2: Any): Boolean = {
+      o2 match {
+        case wrapper: ByteArrayWrapper =>
+          Bytes.equals(o1, wrapper.o1)
+        case _ =>
+          false
+      }
+    }
+    override def hashCode():Int = {
+      Bytes.hashCode(o1)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
index fb8456d..7c59145 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
@@ -17,11 +17,15 @@
 
 package org.apache.hadoop.hbase.spark
 
-import org.apache.hadoop.hbase.TableName
+import java.util
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase.{HConstants, TableName}
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.spark.rdd.RDD
 
+import scala.collection.immutable.HashMap
 import scala.reflect.ClassTag
 
 /**
@@ -158,5 +162,46 @@ object HBaseRDDFunctions
     RDD[R] = {
       hc.mapPartitions[T,R](rdd, f)
     }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * bulkLoad method.
+     *
+     * A Spark Implementation of HBase Bulk load
+     *
+     * This will take the content from an existing RDD then sort and shuffle
+     * it with respect to region splits.  The result of that sort and shuffle
+     * will be written to HFiles.
+     *
+     * After this function is executed the user will have to call
+     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+     *
+     * Also note this version of bulk load is different from past versions in
+     * that it includes the qualifier as part of the sort process. The
+     * reason for this is to be able to support rows will very large number
+     * of columns.
+     *
+     * @param tableName                      The HBase table we are loading into
+     * @param flatMap                        A flapMap function that will make every row in the RDD
+     *                                       into N cells for the bulk load
+     * @param stagingDir                     The location on the FileSystem to bulk load into
+     * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
+     *                                       column family is written
+     * @param compactionExclude              Compaction excluded for the HFiles
+     * @param maxSize                        Max size for the HFiles before they roll
+     */
+    def hbaseBulkLoad(hc: HBaseContext,
+                         tableName: TableName,
+                         flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+                         stagingDir:String,
+                         familyHFileWriteOptionsMap:
+                         util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                         new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
+                         compactionExclude: Boolean = false,
+                         maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
+      hc.bulkLoad(rdd, tableName,
+        flatMap, stagingDir, familyHFileWriteOptionsMap,
+        compactionExclude, maxSize)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
new file mode 100644
index 0000000..b2eda2c
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.io.Serializable
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * This is the key to be used for sorting and shuffling.
+ *
+ * We will only partition on the rowKey but we will sort on all three
+ *
+ * @param rowKey    Record RowKey
+ * @param family    Record ColumnFamily
+ * @param qualifier Cell Qualifier
+ */
+class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte])
+  extends Comparable[KeyFamilyQualifier] with Serializable {
+  override def compareTo(o: KeyFamilyQualifier): Int = {
+    var result = Bytes.compareTo(rowKey, o.rowKey)
+    if (result == 0) {
+      result = Bytes.compareTo(family, o.family)
+      if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
+    }
+    result
+  }
+  override def toString: String = {
+    Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
new file mode 100644
index 0000000..2e5381a
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.spark.{SparkContext, Logging}
+import org.junit.rules.TemporaryFolder
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class BulkLoadSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll  with Logging {
+  @transient var sc: SparkContext = null
+  var TEST_UTIL = new HBaseTestingUtility
+
+  val tableName = "t1"
+  val columnFamily1 = "f1"
+  val columnFamily2 = "f2"
+  val testFolder = new TemporaryFolder()
+
+
+  override def beforeAll() {
+    TEST_UTIL.startMiniCluster()
+    logInfo(" - minicluster started")
+
+    try {
+      TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+    } catch {
+      case e: Exception =>
+        logInfo(" - no table " + tableName + " found")
+    }
+
+    logInfo(" - created table")
+
+    val envMap = Map[String,String](("Xmx", "512m"))
+
+    sc = new SparkContext("local", "test", null, Nil, envMap)
+  }
+
+  override def afterAll() {
+    logInfo("shuting down minicluster")
+    TEST_UTIL.shutdownMiniCluster()
+    logInfo(" - minicluster shut down")
+    TEST_UTIL.cleanupTestDir()
+    sc.stop()
+  }
+
+  test("Basic Test multi family and multi column tests " +
+    "with all default HFile Configs") {
+    val config = TEST_UTIL.getConfiguration
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
+      (Bytes.toBytes("5"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+        val family:Array[Byte] = t._2(0)._1
+        val qualifier = t._2(0)._2
+        val value = t._2(0)._3
+
+        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+        Seq((keyFamilyQualifier, value)).iterator
+      },
+      stagingFolder.getPath)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
+        conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+
+    }
+  }
+
+  test("bulkLoad to test HBase client: Test Roll Over and " +
+    "using an implicit call to bulk load") {
+    val config = TEST_UTIL.getConfiguration
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))),
+      (Bytes.toBytes("5"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    rdd.hbaseBulkLoad(hbaseContext,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+        val family:Array[Byte] = t._2(0)._1
+        val qualifier = t._2(0)._2
+        val value = t._2(0)._3
+
+        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+        Seq((keyFamilyQualifier, value)).iterator
+      },
+      stagingFolder.getPath,
+      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+      compactionExclude = false,
+      20)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
+
+    assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath),
+        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+    }
+  }
+
+  test("Basic Test multi family and multi column tests" +
+    " with one column family with custom configs plus multi region") {
+    val config = TEST_UTIL.getConfiguration
+
+    val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
+    splitKeys(0) = Bytes.toBytes("2")
+    splitKeys(1) = Bytes.toBytes("4")
+
+    logInfo(" - creating table " + tableName)
+    TEST_UTIL.createTable(TableName.valueOf(tableName),
+      Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
+      splitKeys)
+
+    //There are a number of tests in here.
+    // 1. Row keys are not in order
+    // 2. Qualifiers are not in order
+    // 3. Column Families are not in order
+    // 4. There are tests for records in one column family and some in two column families
+    // 5. There are records will a single qualifier and some with two
+    val rdd = sc.parallelize(Array(
+      (Bytes.toBytes("1"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
+      (Bytes.toBytes("3"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
+      (Bytes.toBytes("5"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+      (Bytes.toBytes("4"),
+        Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+      (Bytes.toBytes("2"),
+        Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+    val hbaseContext = new HBaseContext(sc, config)
+
+    testFolder.create()
+    val stagingFolder = testFolder.newFolder()
+
+    val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
+
+    val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
+      "PREFIX")
+
+    familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
+
+    hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+      TableName.valueOf(tableName),
+      t => {
+        val rowKey = t._1
+        val family:Array[Byte] = t._2(0)._1
+        val qualifier = t._2(0)._2
+        val value = t._2(0)._3
+
+        val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+        Seq((keyFamilyQualifier, value)).iterator
+      },
+      stagingFolder.getPath,
+      familyHBaseWriterOptions,
+      compactionExclude = false,
+      HConstants.DEFAULT_MAX_FILE_SIZE)
+
+    val fs = FileSystem.get(config)
+    assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+    val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
+    for ( i <- 0 until f1FileList.length) {
+      val reader = HFile.createReader(fs, f1FileList(i).getPath,
+        new CacheConfig(config), config)
+      assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+      assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
+    }
+
+    assert( 3 ==  f1FileList.length)
+
+    val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
+    for ( i <- 0 until f2FileList.length) {
+      val reader = HFile.createReader(fs, f2FileList(i).getPath,
+        new CacheConfig(config), config)
+      assert(reader.getCompressionAlgorithm.getName.equals("none"))
+      assert(reader.getDataBlockEncoding.name().equals("NONE"))
+    }
+
+    assert( 2 ==  f2FileList.length)
+
+
+    val conn = ConnectionFactory.createConnection(config)
+
+    val load = new LoadIncrementalHFiles(config)
+    val table = conn.getTable(TableName.valueOf(tableName))
+    try {
+      load.doBulkLoad(new Path(stagingFolder.getPath),
+        conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+
+      val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+      assert(cells5.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+      val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+      assert(cells4.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+      val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+      assert(cells3.size == 3)
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+      val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+      assert(cells2.size == 2)
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+      assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+      val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+      assert(cells1.size == 1)
+      assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+      assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+      assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+    } finally {
+      table.close()
+      val admin = ConnectionFactory.createConnection(config).getAdmin
+      try {
+        admin.disableTable(TableName.valueOf(tableName))
+        admin.deleteTable(TableName.valueOf(tableName))
+      } finally {
+        admin.close()
+      }
+      fs.delete(new Path(stagingFolder.getPath), true)
+
+      testFolder.delete()
+
+    }
+  }
+
+  test("bulkLoad partitioner tests") {
+
+    var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
+    splitKeys(0) = Bytes.toBytes("")
+    splitKeys(1) = Bytes.toBytes("3")
+    splitKeys(2) = Bytes.toBytes("7")
+
+    var partitioner = new BulkLoadPartitioner(splitKeys)
+
+    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
+    assert(1 == partitioner.getPartition(Bytes.toBytes("3")))
+    assert(1 == partitioner.getPartition(Bytes.toBytes("4")))
+    assert(1 == partitioner.getPartition(Bytes.toBytes("6")))
+    assert(2 == partitioner.getPartition(Bytes.toBytes("7")))
+    assert(2 == partitioner.getPartition(Bytes.toBytes("8")))
+
+
+    splitKeys = new Array[Array[Byte]](1)
+    splitKeys(0) = Bytes.toBytes("")
+
+    partitioner = new BulkLoadPartitioner(splitKeys)
+
+    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("3")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("4")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("6")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("7")))
+
+    splitKeys = new Array[Array[Byte]](7)
+    splitKeys(0) = Bytes.toBytes("")
+    splitKeys(1) = Bytes.toBytes("02")
+    splitKeys(2) = Bytes.toBytes("04")
+    splitKeys(3) = Bytes.toBytes("06")
+    splitKeys(4) = Bytes.toBytes("08")
+    splitKeys(5) = Bytes.toBytes("10")
+    splitKeys(6) = Bytes.toBytes("12")
+
+    partitioner = new BulkLoadPartitioner(splitKeys)
+
+    assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+    assert(0 == partitioner.getPartition(Bytes.toBytes("01")))
+    assert(1 == partitioner.getPartition(Bytes.toBytes("02")))
+    assert(1 == partitioner.getPartition(Bytes.toBytes("03")))
+    assert(2 == partitioner.getPartition(Bytes.toBytes("04")))
+    assert(2 == partitioner.getPartition(Bytes.toBytes("05")))
+    assert(3 == partitioner.getPartition(Bytes.toBytes("06")))
+    assert(3 == partitioner.getPartition(Bytes.toBytes("07")))
+    assert(4 == partitioner.getPartition(Bytes.toBytes("08")))
+    assert(4 == partitioner.getPartition(Bytes.toBytes("09")))
+    assert(5 == partitioner.getPartition(Bytes.toBytes("10")))
+    assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
+    assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
+    assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
+
+  }
+
+
+}