You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/04/24 18:15:21 UTC
hbase git commit: HBASE-17933: [hbase-spark] Support Java api for
bulkload
Repository: hbase
Updated Branches:
refs/heads/master 9a1aff447 -> 49f707fba
HBASE-17933: [hbase-spark] Support Java api for bulkload
Signed-off-by: Sean Busbey <bu...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/49f707fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/49f707fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/49f707fb
Branch: refs/heads/master
Commit: 49f707fba7c6a9f0210f387e31d1be9f108991f8
Parents: 9a1aff4
Author: Yi Liang <ea...@gmail.com>
Authored: Fri Apr 21 18:10:03 2017 -0700
Committer: Sean Busbey <bu...@apache.org>
Committed: Mon Apr 24 11:48:29 2017 -0500
----------------------------------------------------------------------
.../hbasecontext/JavaHBaseBulkLoadExample.java | 102 ++++++++++
.../hbase/spark/FamiliesQualifiersValues.scala | 12 +-
.../hadoop/hbase/spark/JavaHBaseContext.scala | 68 ++++++-
.../hbase/spark/TestJavaHBaseContext.java | 201 ++++++++++++++++++-
4 files changed, 371 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
new file mode 100644
index 0000000..040546d
--- /dev/null
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.spark.example.hbasecontext;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions;
+import org.apache.hadoop.hbase.spark.JavaHBaseContext;
+import org.apache.hadoop.hbase.spark.KeyFamilyQualifier;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Run this example using command below:
+ *
+ * SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample
+ * path/to/hbase-spark.jar {path/to/output/HFiles}
+ *
+ * This example will output put hfiles in {path/to/output/HFiles}, and user can run
+ * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
+ */
+final public class JavaHBaseBulkLoadExample {
+ private JavaHBaseBulkLoadExample() {}
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("JavaHBaseBulkLoadExample " + "{outputPath}");
+ return;
+ }
+
+ String tableName = "bulkload-table-test";
+ String columnFamily1 = "f1";
+ String columnFamily2 = "f2";
+
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ List<String> list= new ArrayList<String>();
+ // row1
+ list.add("1," + columnFamily1 + ",b,1");
+ // row3
+ list.add("3," + columnFamily1 + ",a,2");
+ list.add("3," + columnFamily1 + ",b,1");
+ list.add("3," + columnFamily2 + ",a,1");
+ /* row2 */
+ list.add("2," + columnFamily2 + ",a,3");
+ list.add("2," + columnFamily2 + ",b,3");
+
+ JavaRDD<String> rdd = jsc.parallelize(list);
+
+ Configuration conf = HBaseConfiguration.create();
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+
+
+ hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0],
+ new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
+
+ @Override
+ public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
+ if (v1 == null)
+ return null;
+ String[] strs = v1.split(",");
+ if(strs.length != 4)
+ return null;
+ KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
+ Bytes.toBytes(strs[2]));
+ return new Pair(kfq, Bytes.toBytes(strs[3]));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
index 92bb3b7..7733802 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala
@@ -55,4 +55,14 @@ class FamiliesQualifiersValues extends Serializable {
qualifierValues.put(new ByteArrayWrapper(qualifier), value)
}
-}
+
+ /**
+ * A wrapper for "+=" method above, can be used by Java
+ * @param family HBase column family
+ * @param qualifier HBase column qualifier
+ * @param value HBase cell value
+ */
+ def add(family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
+ this += (family, qualifier, value)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
index 253b386..57029f3 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala
@@ -17,9 +17,12 @@
package org.apache.hadoop.hbase.spark
+import java.util.Map
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair
+import org.apache.hadoop.hbase.classification.InterfaceAudience
import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
@@ -268,7 +271,6 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
-
* @param tableName The name of the table to get from
* @param batchSize The number of gets to be batched together
* @param javaDStream Original DStream with data to iterate over
@@ -292,6 +294,67 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
}
/**
+ * A simple abstraction over the HBaseContext.bulkLoad method.
+ * It allow addition support for a user to take a JavaRDD and
+ * convert into new JavaRDD[Pair] based on MapFunction,
+ * and HFiles will be generated in stagingDir for bulk load
+ *
+ * @param javaRdd The javaRDD we are bulk loading from
+ * @param tableName The HBase table we are loading into
+ * @param mapFunc A Function that will convert a value in JavaRDD
+ * to Pair(KeyFamilyQualifier, Array[Byte])
+ * @param stagingDir The location on the FileSystem to bulk load into
+ * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
+ * column family is written
+ * @param compactionExclude Compaction excluded for the HFiles
+ * @param maxSize Max size for the HFiles before they roll
+ */
+ def bulkLoad[T](javaRdd: JavaRDD[T],
+ tableName: TableName,
+ mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
+ stagingDir: String,
+ familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
+ compactionExclude: Boolean,
+ maxSize: Long):
+ Unit = {
+ hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
+ val keyFamilyQualifier = t.getFirst
+ val value = t.getSecond
+ Seq((keyFamilyQualifier, value)).iterator
+ }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
+ }
+
+ /**
+ * A simple abstraction over the HBaseContext.bulkLoadThinRows method.
+ * It allow addition support for a user to take a JavaRDD and
+ * convert into new JavaRDD[Pair] based on MapFunction,
+ * and HFiles will be generated in stagingDir for bulk load
+ *
+ * @param javaRdd The javaRDD we are bulk loading from
+ * @param tableName The HBase table we are loading into
+ * @param mapFunc A Function that will convert a value in JavaRDD
+ * to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
+ * @param stagingDir The location on the FileSystem to bulk load into
+ * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
+ * column family is written
+ * @param compactionExclude Compaction excluded for the HFiles
+ * @param maxSize Max size for the HFiles before they roll
+ */
+ def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
+ tableName: TableName,
+ mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
+ stagingDir: String,
+ familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
+ compactionExclude: Boolean,
+ maxSize: Long):
+ Unit = {
+ hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
+ tableName, t => {
+ (t.getFirst, t.getSecond)
+ }, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
+ }
+
+ /**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new JavaRDD
*
@@ -341,4 +404,5 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/49f707fb/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index da6b724..c3f1bcb 100644
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.spark;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@@ -38,17 +44,24 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.api.java.*;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
-import org.junit.*;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import scala.Tuple2;
-
import com.google.common.io.Files;
@Category({MiscTests.class, MediumTests.class})
@@ -58,19 +71,22 @@ public class TestJavaHBaseContext implements Serializable {
protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
+
byte[] tableName = Bytes.toBytes("t1");
byte[] columnFamily = Bytes.toBytes("c");
+ byte[] columnFamily1 = Bytes.toBytes("d");
String columnFamilyStr = Bytes.toString(columnFamily);
+ String columnFamilyStr1 = Bytes.toString(columnFamily1);
+
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
- jsc.addJar("spark.jar");
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
- htu = HBaseTestingUtility.createLocalHTU();
+ htu = new HBaseTestingUtility();
try {
LOG.info("cleaning up test dir");
@@ -91,7 +107,7 @@ public class TestJavaHBaseContext implements Serializable {
LOG.info(" - creating table " + Bytes.toString(tableName));
htu.createTable(TableName.valueOf(tableName),
- columnFamily);
+ new byte[][]{columnFamily, columnFamily1});
LOG.info(" - created table");
} catch (Exception e1) {
throw new RuntimeException(e1);
@@ -278,6 +294,173 @@ public class TestJavaHBaseContext implements Serializable {
Assert.assertEquals(stringJavaRDD.count(), 5);
}
+ @Test
+ public void testBulkLoad() throws Exception {
+
+ Path output = htu.getDataTestDir("testBulkLoad");
+ // Add cell as String: "row,falmily,qualifier,value"
+ List<String> list= new ArrayList<String>();
+ // row1
+ list.add("1," + columnFamilyStr + ",b,1");
+ // row3
+ list.add("3," + columnFamilyStr + ",a,2");
+ list.add("3," + columnFamilyStr + ",b,1");
+ list.add("3," + columnFamilyStr1 + ",a,1");
+ //row2
+ list.add("2," + columnFamilyStr + ",a,3");
+ list.add("2," + columnFamilyStr + ",b,3");
+
+ JavaRDD<String> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+
+
+ hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(),
+ new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+ try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ // Do bulk load
+ LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
+ load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
+
+
+
+ // Check row1
+ List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
+ Assert.assertEquals(cell1.size(), 1);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
+
+ // Check row3
+ List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
+ Assert.assertEquals(cell3.size(), 3);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
+
+ // Check row2
+ List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
+ Assert.assertEquals(cell2.size(), 2);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
+ }
+ }
+
+ @Test
+ public void testBulkLoadThinRows() throws Exception {
+ Path output = htu.getDataTestDir("testBulkLoadThinRows");
+ // because of the limitation of scala bulkLoadThinRows API
+ // we need to provide data as <row, all cells in that row>
+ List<List<String>> list= new ArrayList<List<String>>();
+ // row1
+ List<String> list1 = new ArrayList<String>();
+ list1.add("1," + columnFamilyStr + ",b,1");
+ list.add(list1);
+ // row3
+ List<String> list3 = new ArrayList<String>();
+ list3.add("3," + columnFamilyStr + ",a,2");
+ list3.add("3," + columnFamilyStr + ",b,1");
+ list3.add("3," + columnFamilyStr1 + ",a,1");
+ list.add(list3);
+ //row2
+ List<String> list2 = new ArrayList<String>();
+ list2.add("2," + columnFamilyStr + ",a,3");
+ list2.add("2," + columnFamilyStr + ",b,3");
+ list.add(list2);
+
+ JavaRDD<List<String>> rdd = jsc.parallelize(list);
+
+ Configuration conf = htu.getConfiguration();
+ JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
+
+ hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(),
+ new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+
+ try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ // Do bulk load
+ LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
+ load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
+
+ // Check row1
+ List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
+ Assert.assertEquals(cell1.size(), 1);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
+
+ // Check row3
+ List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
+ Assert.assertEquals(cell3.size(), 3);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
+
+ // Check row2
+ List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
+ Assert.assertEquals(cell2.size(), 2);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
+ }
+
+ }
+ public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
+
+ @Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
+ if (v1 == null)
+ return null;
+ String[] strs = v1.split(",");
+ if(strs.length != 4)
+ return null;
+ KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
+ Bytes.toBytes(strs[2]));
+ return new Pair(kfq, Bytes.toBytes(strs[3]));
+ }
+ }
+
+ public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> {
+
+ @Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception {
+ if (list == null)
+ return null;
+ ByteArrayWrapper rowKey = null;
+ FamiliesQualifiersValues fqv = new FamiliesQualifiersValues();
+ for (String cell : list) {
+ String[] strs = cell.split(",");
+ if (rowKey == null) {
+ rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0]));
+ }
+ fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3]));
+ }
+ return new Pair(rowKey, fqv);
+ }
+ }
+
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
@@ -335,4 +518,4 @@ public class TestJavaHBaseContext implements Serializable {
}
}
-}
\ No newline at end of file
+}