You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/04/24 18:15:21 UTC

hbase git commit: HBASE-17933: [hbase-spark] Support Java api for bulkload

Repository: hbase
Updated Branches:
  refs/heads/master 9a1aff447 -> 49f707fba


HBASE-17933: [hbase-spark] Support Java api for bulkload

Signed-off-by: Sean Busbey <bu...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/49f707fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/49f707fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/49f707fb

Branch: refs/heads/master
Commit: 49f707fba7c6a9f0210f387e31d1be9f108991f8
Parents: 9a1aff4
Author: Yi Liang <ea...@gmail.com>
Authored: Fri Apr 21 18:10:03 2017 -0700
Committer: Sean Busbey <bu...@apache.org>
Committed: Mon Apr 24 11:48:29 2017 -0500

----------------------------------------------------------------------
 .../hbasecontext/JavaHBaseBulkLoadExample.java  | 102 ++++++++++
 .../hbase/spark/FamiliesQualifiersValues.scala  |  12 +-
 .../hadoop/hbase/spark/JavaHBaseContext.scala   |  68 ++++++-
 .../hbase/spark/TestJavaHBaseContext.java       | 201 ++++++++++++++++++-
 4 files changed, 371 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
new file mode 100644
index 0000000..040546d
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.spark.KeyFamilyQualifier;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Run this example using command below:
+ *
+ *  SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample
+ *  path/to/hbase-spark.jar {path/to/output/HFiles}
+ *
+ * This example will output put hfiles in {path/to/output/HFiles}, and user can run
+ * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
+ */
+final public class JavaHBaseBulkLoadExample {
+  private JavaHBaseBulkLoadExample() {}
+
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.out.println("JavaHBaseBulkLoadExample  " + "{outputPath}");
+      return;
+    }
+
+    String tableName = "bulkload-table-test";
+    String columnFamily1 = "f1";
+    String columnFamily2 = "f2";
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      List<String> list= new ArrayList<String>();
+      // row1
+      list.add("1," + columnFamily1 + ",b,1");
+      // row3
+      list.add("3," + columnFamily1 + ",a,2");
+      list.add("3," + columnFamily1 + ",b,1");
+      list.add("3," + columnFamily2 + ",a,1");
+      /* row2 */
+      list.add("2," + columnFamily2 + ",a,3");
+      list.add("2," + columnFamily2 + ",b,3");
+
+      JavaRDD<String> rdd = jsc.parallelize(list);
+
+      Configuration conf = HBaseConfiguration.create();
+      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+
+
+      hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0],
+          new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
+
+    @Override
+    public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
+      if (v1 == null)
+        return null;
+      String[] strs = v1.split(",");
+      if(strs.length != 4)
+        return null;
+      KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
+          Bytes.toBytes(strs[2]));
+      return new Pair(kfq, Bytes.toBytes(strs[3]));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
index 92bb3b7..7733802 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
@@ -55,4 +55,14 @@ class FamiliesQualifiersValues extends Serializable {
 
     qualifierValues.put(new ByteArrayWrapper(qualifier), value)
   }
-}
+
+  /**
+    * A wrapper for "+=" method above, can be used by Java
+    * @param family    HBase column family
+    * @param qualifier HBase column qualifier
+    * @param value     HBase cell value
+    */
+  def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
+    this += (family, qualifier, value)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
index 253b386..57029f3 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
@@ -17,9 +17,12 @@
 
 package org.apache.hadoop.hbase.spark
 
+import java.util.Map
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair
+import org.apache.hadoop.hbase.classification.InterfaceAudience
 import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@@ -268,7 +271,6 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
    * generates a new DStream based on Gets and the results
    * they bring back from HBase
    *
-
    * @param tableName     The name of the table to get from
    * @param batchSize     The number of gets to be batched together
    * @param javaDStream   Original DStream with data to iterate over
@@ -292,6 +294,67 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
   }
 
   /**
+    * A simple abstraction over the HBaseContext.bulkLoad method.
+    * It allow addition support for a user to take a JavaRDD and
+    * convert into new JavaRDD[Pair] based on MapFunction,
+    * and HFiles will be generated in stagingDir for bulk load
+    *
+    * @param javaRdd                        The javaRDD we are bulk loading from
+    * @param tableName                      The HBase table we are loading into
+    * @param mapFunc                        A Function that will convert a value in JavaRDD
+    *                                       to Pair(KeyFamilyQualifier, Array[Byte])
+    * @param stagingDir                     The location on the FileSystem to bulk load into
+    * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
+    *                                       column family is written
+    * @param compactionExclude              Compaction excluded for the HFiles
+    * @param maxSize                        Max size for the HFiles before they roll
+    */
+  def bulkLoad[T](javaRdd: JavaRDD[T],
+                  tableName: TableName,
+                  mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
+                  stagingDir: String,
+                  familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
+                  compactionExclude: Boolean,
+                  maxSize: Long):
+  Unit = {
+    hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
+      val keyFamilyQualifier = t.getFirst
+      val value = t.getSecond
+      Seq((keyFamilyQualifier, value)).iterator
+    }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
+  }
+
+  /**
+    * A simple abstraction over the HBaseContext.bulkLoadThinRows method.
+    * It allow addition support for a user to take a JavaRDD and
+    * convert into new JavaRDD[Pair] based on MapFunction,
+    * and HFiles will be generated in stagingDir for bulk load
+    *
+    * @param javaRdd                        The javaRDD we are bulk loading from
+    * @param tableName                      The HBase table we are loading into
+    * @param mapFunc                        A Function that will convert a value in JavaRDD
+    *                                       to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
+    * @param stagingDir                     The location on the FileSystem to bulk load into
+    * @param familyHFileWriteOptionsMap     Options that will define how the HFile for a
+    *                                       column family is written
+    * @param compactionExclude              Compaction excluded for the HFiles
+    * @param maxSize                        Max size for the HFiles before they roll
+    */
+  def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
+                       tableName: TableName,
+                       mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
+                       stagingDir: String,
+                       familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
+                       compactionExclude: Boolean,
+                       maxSize: Long):
+  Unit = {
+    hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
+      tableName, t => {
+      (t.getFirst, t.getSecond)
+    }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
+  }
+
+  /**
    * This function will use the native HBase TableInputFormat with the
    * given scan object to generate a new JavaRDD
    *
@@ -341,4 +404,5 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
    */
   private[spark]
   def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index da6b724..c3f1bcb 100644
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.spark;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -38,17 +44,24 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.api.java.*;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.junit.*;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import scala.Tuple2;
-
 import com.google.common.io.Files;
 
 @Category({MiscTests.class, MediumTests.class})
@@ -58,19 +71,22 @@ public class TestJavaHBaseContext implements Serializable {
   protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
 
 
+
   byte[] tableName = Bytes.toBytes("t1");
   byte[] columnFamily = Bytes.toBytes("c");
+  byte[] columnFamily1 = Bytes.toBytes("d");
   String columnFamilyStr = Bytes.toString(columnFamily);
+  String columnFamilyStr1 = Bytes.toString(columnFamily1);
+
 
   @Before
   public void setUp() {
     jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
-    jsc.addJar("spark.jar");
 
     File tempDir = Files.createTempDir();
     tempDir.deleteOnExit();
 
-    htu = HBaseTestingUtility.createLocalHTU();
+    htu = new HBaseTestingUtility();
     try {
       LOG.info("cleaning up test dir");
 
@@ -91,7 +107,7 @@ public class TestJavaHBaseContext implements Serializable {
 
       LOG.info(" - creating table " + Bytes.toString(tableName));
       htu.createTable(TableName.valueOf(tableName),
-              columnFamily);
+          new byte[][]{columnFamily, columnFamily1});
       LOG.info(" - created table");
     } catch (Exception e1) {
       throw new RuntimeException(e1);
@@ -278,6 +294,173 @@ public class TestJavaHBaseContext implements Serializable {
     Assert.assertEquals(stringJavaRDD.count(), 5);
   }
 
+  @Test
+  public void testBulkLoad() throws Exception {
+
+    Path output = htu.getDataTestDir("testBulkLoad");
+    // Add cell as String: "row,falmily,qualifier,value"
+    List<String> list= new ArrayList<String>();
+    // row1
+    list.add("1," + columnFamilyStr + ",b,1");
+    // row3
+    list.add("3," + columnFamilyStr + ",a,2");
+    list.add("3," + columnFamilyStr + ",b,1");
+    list.add("3," + columnFamilyStr1 + ",a,1");
+    //row2
+    list.add("2," + columnFamilyStr + ",a,3");
+    list.add("2," + columnFamilyStr + ",b,3");
+
+    JavaRDD<String> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+
+
+    hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(),
+        new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+    try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
+      Table table = conn.getTable(TableName.valueOf(tableName));
+      // Do bulk load
+      LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
+      load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
+
+
+
+      // Check row1
+      List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
+      Assert.assertEquals(cell1.size(), 1);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
+
+      // Check row3
+      List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
+      Assert.assertEquals(cell3.size(), 3);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
+
+      // Check row2
+      List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
+      Assert.assertEquals(cell2.size(), 2);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
+    }
+  }
+
+  @Test
+  public void testBulkLoadThinRows() throws Exception {
+    Path output = htu.getDataTestDir("testBulkLoadThinRows");
+    // because of the limitation of scala bulkLoadThinRows API
+    // we need to provide data as <row, all cells in that row>
+    List<List<String>> list= new ArrayList<List<String>>();
+    // row1
+    List<String> list1 = new ArrayList<String>();
+    list1.add("1," + columnFamilyStr + ",b,1");
+    list.add(list1);
+    // row3
+    List<String> list3 = new ArrayList<String>();
+    list3.add("3," + columnFamilyStr + ",a,2");
+    list3.add("3," + columnFamilyStr + ",b,1");
+    list3.add("3," + columnFamilyStr1 + ",a,1");
+    list.add(list3);
+    //row2
+    List<String> list2 = new ArrayList<String>();
+    list2.add("2," + columnFamilyStr + ",a,3");
+    list2.add("2," + columnFamilyStr + ",b,3");
+    list.add(list2);
+
+    JavaRDD<List<String>> rdd = jsc.parallelize(list);
+
+    Configuration conf = htu.getConfiguration();
+    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+    hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(),
+        new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+
+    try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
+      Table table = conn.getTable(TableName.valueOf(tableName));
+      // Do bulk load
+      LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
+      load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
+
+      // Check row1
+      List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
+      Assert.assertEquals(cell1.size(), 1);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
+
+      // Check row3
+      List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
+      Assert.assertEquals(cell3.size(), 3);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
+
+      // Check row2
+      List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
+      Assert.assertEquals(cell2.size(), 2);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
+      Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
+    }
+
+  }
+  public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
+
+    @Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
+      if (v1 == null)
+        return null;
+      String[] strs = v1.split(",");
+      if(strs.length != 4)
+        return null;
+      KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
+          Bytes.toBytes(strs[2]));
+      return new Pair(kfq, Bytes.toBytes(strs[3]));
+    }
+  }
+
+  public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> {
+
+    @Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception {
+      if (list == null)
+        return null;
+      ByteArrayWrapper rowKey = null;
+      FamiliesQualifiersValues fqv = new FamiliesQualifiersValues();
+      for (String cell : list) {
+        String[] strs = cell.split(",");
+        if (rowKey == null) {
+          rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0]));
+        }
+        fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3]));
+      }
+      return new Pair(rowKey, fqv);
+    }
+  }
+
   public static class GetFunction implements Function<byte[], Get> {
 
     private static final long serialVersionUID = 1L;
@@ -335,4 +518,4 @@ public class TestJavaHBaseContext implements Serializable {
     }
   }
 
-}
\ No newline at end of file
+}