You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/08/12 17:29:14 UTC
hbase git commit: HBASE-14150 Add BulkLoad functionality to
HBase-Spark Module (Ted Malaska)
Repository: hbase
Updated Branches:
refs/heads/master aa3538f80 -> 72a48a133
HBASE-14150 Add BulkLoad functionality to HBase-Spark Module (Ted Malaska)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72a48a13
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72a48a13
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72a48a13
Branch: refs/heads/master
Commit: 72a48a1333f6c01c46cd244439198ccce3f941ac
Parents: aa3538f
Author: tedyu <yu...@gmail.com>
Authored: Wed Aug 12 08:29:03 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Aug 12 08:29:03 2015 -0700
----------------------------------------------------------------------
.../hbase/spark/BulkLoadPartitioner.scala | 56 ++
.../hbase/spark/FamilyHFileWriteOptions.scala | 35 ++
.../hadoop/hbase/spark/HBaseContext.scala | 294 +++++++++-
.../hadoop/hbase/spark/HBaseRDDFunctions.scala | 47 +-
.../hadoop/hbase/spark/KeyFamilyQualifier.scala | 46 ++
.../hadoop/hbase/spark/BulkLoadSuite.scala | 537 +++++++++++++++++++
6 files changed, 1002 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
new file mode 100644
index 0000000..c51a3af
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util
+import java.util.Comparator
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Partitioner
+
+/**
+ * A Partitioner implementation that will separate records to different
+ * HBase Regions based on region splits
+ *
+ * @param startKeys The start keys for the given table
+ */
+class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
+ extends Partitioner {
+
+ override def numPartitions: Int = startKeys.length
+
+ override def getPartition(key: Any): Int = {
+
+ val rowKey:Array[Byte] =
+ key match {
+ case qualifier: KeyFamilyQualifier =>
+ qualifier.rowKey
+ case _ =>
+ key.asInstanceOf[Array[Byte]]
+ }
+
+ val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
+ override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
+ Bytes.compareTo(o1, o2)
+ }
+ }
+ val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+ if (partition < 0) partition * -1 + -2
+ else partition
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
new file mode 100644
index 0000000..7dbe140
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.io.Serializable
+
+/**
+ * This object will hold optional data for how a given column family's
+ * writer will work
+ *
+ * @param compression String to define the Compression to be used in the HFile
+ * @param bloomType String to define the bloom type to be used in the HFile
+ * @param blockSize The block size to be used in the HFile
+ * @param dataBlockEncoding String to define the data block encoding to be used
+ * in the HFile
+ */
+class FamilyHFileWriteOptions( val compression:String,
+ val bloomType: String,
+ val blockSize: Int,
+ val dataBlockEncoding: String) extends Serializable
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index f060fea..9d14e22 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -17,31 +17,35 @@
package org.apache.hadoop.hbase.spark
-import org.apache.hadoop.hbase.TableName
+import java.net.InetSocketAddress
+import java.util
+
+import org.apache.hadoop.hbase.fs.HFileSystem
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
+import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
+import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.ConnectionFactory
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.client.Get
-import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.hadoop.hbase.client._
import scala.reflect.ClassTag
-import org.apache.hadoop.hbase.client.Connection
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.client.Delete
import org.apache.spark.{Logging, SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
+import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
+TableInputFormat, IdentityTableMapper}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.hbase.client.Mutation
import org.apache.spark.streaming.dstream.DStream
import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat
-import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
import org.apache.hadoop.fs.{Path, FileSystem}
+import scala.collection.mutable
/**
* HBaseContext is a façade for HBase operations
@@ -567,4 +571,270 @@ class HBaseContext(@transient sc: SparkContext,
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+ /**
+ * A Spark Implementation of HBase Bulk load
+ *
+ * This will take the content from an existing RDD then sort and shuffle
+ * it with respect to region splits. The result of that sort and shuffle
+ * will be written to HFiles.
+ *
+ * After this function is executed the user will have to call
+ * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+ *
+ * Also note this version of bulk load is different from past versions in
+ * that it includes the qualifier as part of the sort process. The
+ * reason for this is to be able to support rows will very large number
+ * of columns.
+ *
+ * @param rdd The RDD we are bulk loading from
+ * @param tableName The HBase table we are loading into
+ * @param flatMap A flapMap function that will make every
+ * row in the RDD
+ * into N cells for the bulk load
+ * @param stagingDir The location on the FileSystem to bulk load into
+ * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
+ * column family is written
+ * @param compactionExclude Compaction excluded for the HFiles
+ * @param maxSize Max size for the HFiles before they roll
+ * @tparam T The Type of values in the original RDD
+ */
+ def bulkLoad[T](rdd:RDD[T],
+ tableName: TableName,
+ flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+ stagingDir:String,
+ familyHFileWriteOptionsMap:
+ util.Map[Array[Byte], FamilyHFileWriteOptions] =
+ new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+ compactionExclude: Boolean = false,
+ maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+ Unit = {
+ val conn = ConnectionFactory.createConnection(config)
+ val regionLocator = conn.getRegionLocator(tableName)
+ val startKeys = regionLocator.getStartKeys
+ val defaultCompressionStr = config.get("hfile.compression",
+ Compression.Algorithm.NONE.getName)
+ val defaultCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr)
+ val now = System.currentTimeMillis()
+ val tableNameByteArray = tableName.getName
+
+ val familyHFileWriteOptionsMapInternal =
+ new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+
+ val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+
+ while (entrySetIt.hasNext) {
+ val entry = entrySetIt.next()
+ familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
+ }
+
+ /**
+ * This will return a new HFile writer when requested
+ *
+ * @param family column family
+ * @param conf configuration to connect to HBase
+ * @param favoredNodes nodes that we would like to write too
+ * @param fs FileSystem object where we will be writing the HFiles to
+ * @return WriterLength object
+ */
+ def getNewWriter(family: Array[Byte], conf: Configuration,
+ favoredNodes: Array[InetSocketAddress],
+ fs:FileSystem,
+ familydir:Path): WriterLength = {
+
+
+ var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
+
+ if (familyOptions == null) {
+ familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
+ BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
+ familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
+ }
+
+ val tempConf = new Configuration(conf)
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+ val contextBuilder = new HFileContextBuilder()
+ .withCompression(Algorithm.valueOf(familyOptions.compression))
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(familyOptions.blockSize)
+ contextBuilder.withDataBlockEncoding(DataBlockEncoding.
+ valueOf(familyOptions.dataBlockEncoding))
+ val hFileContext = contextBuilder.build()
+
+ if (null == favoredNodes) {
+ new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
+ } else {
+ new WriterLength(0,
+ new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+ .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+ .withFavoredNodes(favoredNodes).build())
+ }
+ }
+
+ val regionSplitPartitioner =
+ new BulkLoadPartitioner(startKeys)
+
+ //This is where all the magic happens
+ //Here we are going to do the following things
+ // 1. FlapMap every row in the RDD into key column value tuples
+ // 2. Then we are going to repartition sort and shuffle
+ // 3. Finally we are going to write out our HFiles
+ rdd.flatMap( r => flatMap(r)).
+ repartitionAndSortWithinPartitions(regionSplitPartitioner).
+ hbaseForeachPartition(this, (it, conn) => {
+
+ val conf = broadcastedConf.value.value
+ val fs = FileSystem.get(conf)
+ val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+ var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+ var rollOverRequested = false
+
+ /**
+ * This will roll all writers
+ */
+ def rollWriters(): Unit = {
+ writerMap.values.foreach( wl => {
+ if (wl.writer != null) {
+ logDebug("Writer=" + wl.writer.getPath +
+ (if (wl.written == 0) "" else ", wrote=" + wl.written))
+ close(wl.writer)
+ }
+ })
+ writerMap.clear()
+ rollOverRequested = false
+ }
+
+ /**
+ * This function will close a given HFile writer
+ * @param w The writer to close
+ */
+ def close(w:StoreFile.Writer): Unit = {
+ if (w != null) {
+ w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()))
+ w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+ Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
+ w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(true))
+ w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude))
+ w.appendTrackedTimestampsToMetadata()
+ w.close()
+ }
+ }
+
+ //Here is where we finally iterate through the data in this partition of the
+ //RDD that has been sorted and partitioned
+ it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+ //This will get a writer for the column family
+ //If there is no writer for a given column family then
+ //it will get created here.
+ val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {
+
+ val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))
+
+ fs.mkdirs(familyDir)
+
+ val loc:HRegionLocation = {
+ try {
+ val locator =
+ conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
+ locator.getRegionLocation(keyFamilyQualifier.rowKey)
+ } catch {
+ case e: Throwable =>
+ logWarning("there's something wrong when locating rowkey: " +
+ Bytes.toString(keyFamilyQualifier.rowKey))
+ null
+ }
+ }
+ if (null == loc) {
+ if (log.isTraceEnabled) {
+ logTrace("failed to get region location, so use default writer: " +
+ Bytes.toString(keyFamilyQualifier.rowKey))
+ }
+ getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
+ fs = fs, familydir = familyDir)
+ } else {
+ if (log.isDebugEnabled) {
+ logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
+ }
+ val initialIsa =
+ new InetSocketAddress(loc.getHostname, loc.getPort)
+ if (initialIsa.isUnresolved) {
+ if (log.isTraceEnabled) {
+ logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ + loc.getPort + ", so use default writer")
+ }
+ getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
+ } else {
+ if(log.isDebugEnabled) {
+ logDebug("use favored nodes writer: " + initialIsa.getHostString)
+ }
+ getNewWriter(keyFamilyQualifier.family, conf,
+ Array[InetSocketAddress](initialIsa), fs, familyDir)
+ }
+ }
+ })
+
+ val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
+ keyFamilyQualifier.family,
+ keyFamilyQualifier.qualifier,
+ now,cellValue)
+
+ wl.writer.append(keyValue)
+ wl.written += keyValue.getLength
+
+ rollOverRequested = rollOverRequested || wl.written > maxSize
+
+ //This will only roll if we have at least one column family file that is
+ //bigger then maxSize and we have finished a given row key
+ if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
+ rollWriters()
+ }
+
+ previousRow = keyFamilyQualifier.rowKey
+ }
+ //We have finished all the data so lets close up the writers
+ rollWriters()
+ })
+ }
+
+ /**
+ * This is a wrapper class around StoreFile.Writer. The reason for the
+ * wrapper is to keep the length of the file along side the writer
+ *
+ * @param written The writer to be wrapped
+ * @param writer The number of bytes written to the writer
+ */
+ class WriterLength(var written:Long, val writer:StoreFile.Writer)
+
+ /**
+ * This is a wrapper over a byte array so it can work as
+ * a key in a hashMap
+ *
+ * @param o1 The Byte Array value
+ */
+ class ByteArrayWrapper (val o1:Array[Byte])
+ extends Comparable[ByteArrayWrapper] with Serializable {
+ override def compareTo(o2: ByteArrayWrapper): Int = {
+ Bytes.compareTo(o1,o2.o1)
+ }
+ override def equals(o2: Any): Boolean = {
+ o2 match {
+ case wrapper: ByteArrayWrapper =>
+ Bytes.equals(o1, wrapper.o1)
+ case _ =>
+ false
+ }
+ }
+ override def hashCode():Int = {
+ Bytes.hashCode(o1)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
index fb8456d..7c59145 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
@@ -17,11 +17,15 @@
package org.apache.hadoop.hbase.spark
-import org.apache.hadoop.hbase.TableName
+import java.util
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase.{HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
+import scala.collection.immutable.HashMap
import scala.reflect.ClassTag
/**
@@ -158,5 +162,46 @@ object HBaseRDDFunctions
RDD[R] = {
hc.mapPartitions[T,R](rdd, f)
}
+
+ /**
+ * Implicit method that gives easy access to HBaseContext's
+ * bulkLoad method.
+ *
+ * A Spark Implementation of HBase Bulk load
+ *
+ * This will take the content from an existing RDD then sort and shuffle
+ * it with respect to region splits. The result of that sort and shuffle
+ * will be written to HFiles.
+ *
+ * After this function is executed the user will have to call
+ * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+ *
+ * Also note this version of bulk load is different from past versions in
+ * that it includes the qualifier as part of the sort process. The
+ * reason for this is to be able to support rows will very large number
+ * of columns.
+ *
+ * @param tableName The HBase table we are loading into
+ * @param flatMap A flapMap function that will make every row in the RDD
+ * into N cells for the bulk load
+ * @param stagingDir The location on the FileSystem to bulk load into
+ * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
+ * column family is written
+ * @param compactionExclude Compaction excluded for the HFiles
+ * @param maxSize Max size for the HFiles before they roll
+ */
+ def hbaseBulkLoad(hc: HBaseContext,
+ tableName: TableName,
+ flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+ stagingDir:String,
+ familyHFileWriteOptionsMap:
+ util.Map[Array[Byte], FamilyHFileWriteOptions] =
+ new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
+ compactionExclude: Boolean = false,
+ maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
+ hc.bulkLoad(rdd, tableName,
+ flatMap, stagingDir, familyHFileWriteOptionsMap,
+ compactionExclude, maxSize)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
new file mode 100644
index 0000000..b2eda2c
--- /dev/null
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.io.Serializable
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * This is the key to be used for sorting and shuffling.
+ *
+ * We will only partition on the rowKey but we will sort on all three
+ *
+ * @param rowKey Record RowKey
+ * @param family Record ColumnFamily
+ * @param qualifier Cell Qualifier
+ */
+class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte])
+ extends Comparable[KeyFamilyQualifier] with Serializable {
+ override def compareTo(o: KeyFamilyQualifier): Int = {
+ var result = Bytes.compareTo(rowKey, o.rowKey)
+ if (result == 0) {
+ result = Bytes.compareTo(family, o.family)
+ if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
+ }
+ result
+ }
+ override def toString: String = {
+ Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72a48a13/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
new file mode 100644
index 0000000..2e5381a
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
+import org.apache.spark.{SparkContext, Logging}
+import org.junit.rules.TemporaryFolder
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+
+class BulkLoadSuite extends FunSuite with
+BeforeAndAfterEach with BeforeAndAfterAll with Logging {
+ @transient var sc: SparkContext = null
+ var TEST_UTIL = new HBaseTestingUtility
+
+ val tableName = "t1"
+ val columnFamily1 = "f1"
+ val columnFamily2 = "f2"
+ val testFolder = new TemporaryFolder()
+
+
+ override def beforeAll() {
+ TEST_UTIL.startMiniCluster()
+ logInfo(" - minicluster started")
+
+ try {
+ TEST_UTIL.deleteTable(TableName.valueOf(tableName))
+ } catch {
+ case e: Exception =>
+ logInfo(" - no table " + tableName + " found")
+ }
+
+ logInfo(" - created table")
+
+ val envMap = Map[String,String](("Xmx", "512m"))
+
+ sc = new SparkContext("local", "test", null, Nil, envMap)
+ }
+
+ override def afterAll() {
+ logInfo("shuting down minicluster")
+ TEST_UTIL.shutdownMiniCluster()
+ logInfo(" - minicluster shut down")
+ TEST_UTIL.cleanupTestDir()
+ sc.stop()
+ }
+
+ test("Basic Test multi family and multi column tests " +
+ "with all default HFile Configs") {
+ val config = TEST_UTIL.getConfiguration
+
+ logInfo(" - creating table " + tableName)
+ TEST_UTIL.createTable(TableName.valueOf(tableName),
+ Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+ //There are a number of tests in here.
+ // 1. Row keys are not in order
+ // 2. Qualifiers are not in order
+ // 3. Column Families are not in order
+ // 4. There are tests for records in one column family and some in two column families
+ // 5. There are records will a single qualifier and some with two
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+ val hbaseContext = new HBaseContext(sc, config)
+
+ testFolder.create()
+ val stagingFolder = testFolder.newFolder()
+
+ hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+ TableName.valueOf(tableName),
+ t => {
+ val rowKey = t._1
+ val family:Array[Byte] = t._2(0)._1
+ val qualifier = t._2(0)._2
+ val value = t._2(0)._3
+
+ val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+ Seq((keyFamilyQualifier, value)).iterator
+ },
+ stagingFolder.getPath)
+
+ val fs = FileSystem.get(config)
+ assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+ val conn = ConnectionFactory.createConnection(config)
+
+ val load = new LoadIncrementalHFiles(config)
+ val table = conn.getTable(TableName.valueOf(tableName))
+ try {
+ load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
+ conn.getRegionLocator(TableName.valueOf(tableName)))
+
+ val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+ assert(cells5.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+ val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+ assert(cells4.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+ val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+ assert(cells3.size == 3)
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+ val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+ assert(cells2.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+ val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+ assert(cells1.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+ } finally {
+ table.close()
+ val admin = ConnectionFactory.createConnection(config).getAdmin
+ try {
+ admin.disableTable(TableName.valueOf(tableName))
+ admin.deleteTable(TableName.valueOf(tableName))
+ } finally {
+ admin.close()
+ }
+ fs.delete(new Path(stagingFolder.getPath), true)
+
+ testFolder.delete()
+
+ }
+ }
+
+ test("bulkLoad to test HBase client: Test Roll Over and " +
+ "using an implicit call to bulk load") {
+ val config = TEST_UTIL.getConfiguration
+
+ logInfo(" - creating table " + tableName)
+ TEST_UTIL.createTable(TableName.valueOf(tableName),
+ Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
+
+ //There are a number of tests in here.
+ // 1. Row keys are not in order
+ // 2. Qualifiers are not in order
+ // 3. Column Families are not in order
+ // 4. There are tests for records in one column family and some in two column families
+ // 5. There are records will a single qualifier and some with two
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+ val hbaseContext = new HBaseContext(sc, config)
+
+ testFolder.create()
+ val stagingFolder = testFolder.newFolder()
+
+ rdd.hbaseBulkLoad(hbaseContext,
+ TableName.valueOf(tableName),
+ t => {
+ val rowKey = t._1
+ val family:Array[Byte] = t._2(0)._1
+ val qualifier = t._2(0)._2
+ val value = t._2(0)._3
+
+ val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+ Seq((keyFamilyQualifier, value)).iterator
+ },
+ stagingFolder.getPath,
+ new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+ compactionExclude = false,
+ 20)
+
+ val fs = FileSystem.get(config)
+ assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
+
+ assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
+
+ val conn = ConnectionFactory.createConnection(config)
+
+ val load = new LoadIncrementalHFiles(config)
+ val table = conn.getTable(TableName.valueOf(tableName))
+ try {
+ load.doBulkLoad(new Path(stagingFolder.getPath),
+ conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+
+ val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+ assert(cells5.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+ val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+ assert(cells4.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+ val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+ assert(cells3.size == 3)
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
+
+ val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+ assert(cells2.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+ val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+ assert(cells1.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+ } finally {
+ table.close()
+ val admin = ConnectionFactory.createConnection(config).getAdmin
+ try {
+ admin.disableTable(TableName.valueOf(tableName))
+ admin.deleteTable(TableName.valueOf(tableName))
+ } finally {
+ admin.close()
+ }
+ fs.delete(new Path(stagingFolder.getPath), true)
+
+ testFolder.delete()
+ }
+ }
+
+ test("Basic Test multi family and multi column tests" +
+ " with one column family with custom configs plus multi region") {
+ val config = TEST_UTIL.getConfiguration
+
+ val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
+ splitKeys(0) = Bytes.toBytes("2")
+ splitKeys(1) = Bytes.toBytes("4")
+
+ logInfo(" - creating table " + tableName)
+ TEST_UTIL.createTable(TableName.valueOf(tableName),
+ Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
+ splitKeys)
+
+ //There are a number of tests in here.
+ // 1. Row keys are not in order
+ // 2. Qualifiers are not in order
+ // 3. Column Families are not in order
+ // 4. There are tests for records in one column family and some in two column families
+ // 5. There are records will a single qualifier and some with two
+ val rdd = sc.parallelize(Array(
+ (Bytes.toBytes("1"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
+ (Bytes.toBytes("3"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
+ (Bytes.toBytes("5"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
+ (Bytes.toBytes("4"),
+ Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
+ (Bytes.toBytes("2"),
+ Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
+
+ val hbaseContext = new HBaseContext(sc, config)
+
+ testFolder.create()
+ val stagingFolder = testFolder.newFolder()
+
+ val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
+
+ val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
+ "PREFIX")
+
+ familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
+
+ hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
+ TableName.valueOf(tableName),
+ t => {
+ val rowKey = t._1
+ val family:Array[Byte] = t._2(0)._1
+ val qualifier = t._2(0)._2
+ val value = t._2(0)._3
+
+ val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
+
+ Seq((keyFamilyQualifier, value)).iterator
+ },
+ stagingFolder.getPath,
+ familyHBaseWriterOptions,
+ compactionExclude = false,
+ HConstants.DEFAULT_MAX_FILE_SIZE)
+
+ val fs = FileSystem.get(config)
+ assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
+
+ val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
+ for ( i <- 0 until f1FileList.length) {
+ val reader = HFile.createReader(fs, f1FileList(i).getPath,
+ new CacheConfig(config), config)
+ assert(reader.getCompressionAlgorithm.getName.equals("gz"))
+ assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
+ }
+
+ assert( 3 == f1FileList.length)
+
+ val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
+ for ( i <- 0 until f2FileList.length) {
+ val reader = HFile.createReader(fs, f2FileList(i).getPath,
+ new CacheConfig(config), config)
+ assert(reader.getCompressionAlgorithm.getName.equals("none"))
+ assert(reader.getDataBlockEncoding.name().equals("NONE"))
+ }
+
+ assert( 2 == f2FileList.length)
+
+
+ val conn = ConnectionFactory.createConnection(config)
+
+ val load = new LoadIncrementalHFiles(config)
+ val table = conn.getTable(TableName.valueOf(tableName))
+ try {
+ load.doBulkLoad(new Path(stagingFolder.getPath),
+ conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
+
+ val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
+ assert(cells5.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
+
+ val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
+ assert(cells4.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
+
+ val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
+ assert(cells3.size == 3)
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
+
+
+ val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
+ assert(cells2.size == 2)
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
+ assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
+
+ val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
+ assert(cells1.size == 1)
+ assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
+ assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
+ assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
+
+ } finally {
+ table.close()
+ val admin = ConnectionFactory.createConnection(config).getAdmin
+ try {
+ admin.disableTable(TableName.valueOf(tableName))
+ admin.deleteTable(TableName.valueOf(tableName))
+ } finally {
+ admin.close()
+ }
+ fs.delete(new Path(stagingFolder.getPath), true)
+
+ testFolder.delete()
+
+ }
+ }
+
+ test("bulkLoad partitioner tests") {
+
+ var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
+ splitKeys(0) = Bytes.toBytes("")
+ splitKeys(1) = Bytes.toBytes("3")
+ splitKeys(2) = Bytes.toBytes("7")
+
+ var partitioner = new BulkLoadPartitioner(splitKeys)
+
+ assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
+ assert(1 == partitioner.getPartition(Bytes.toBytes("3")))
+ assert(1 == partitioner.getPartition(Bytes.toBytes("4")))
+ assert(1 == partitioner.getPartition(Bytes.toBytes("6")))
+ assert(2 == partitioner.getPartition(Bytes.toBytes("7")))
+ assert(2 == partitioner.getPartition(Bytes.toBytes("8")))
+
+
+ splitKeys = new Array[Array[Byte]](1)
+ splitKeys(0) = Bytes.toBytes("")
+
+ partitioner = new BulkLoadPartitioner(splitKeys)
+
+ assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("3")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("4")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("6")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("7")))
+
+ splitKeys = new Array[Array[Byte]](7)
+ splitKeys(0) = Bytes.toBytes("")
+ splitKeys(1) = Bytes.toBytes("02")
+ splitKeys(2) = Bytes.toBytes("04")
+ splitKeys(3) = Bytes.toBytes("06")
+ splitKeys(4) = Bytes.toBytes("08")
+ splitKeys(5) = Bytes.toBytes("10")
+ splitKeys(6) = Bytes.toBytes("12")
+
+ partitioner = new BulkLoadPartitioner(splitKeys)
+
+ assert(0 == partitioner.getPartition(Bytes.toBytes("")))
+ assert(0 == partitioner.getPartition(Bytes.toBytes("01")))
+ assert(1 == partitioner.getPartition(Bytes.toBytes("02")))
+ assert(1 == partitioner.getPartition(Bytes.toBytes("03")))
+ assert(2 == partitioner.getPartition(Bytes.toBytes("04")))
+ assert(2 == partitioner.getPartition(Bytes.toBytes("05")))
+ assert(3 == partitioner.getPartition(Bytes.toBytes("06")))
+ assert(3 == partitioner.getPartition(Bytes.toBytes("07")))
+ assert(4 == partitioner.getPartition(Bytes.toBytes("08")))
+ assert(4 == partitioner.getPartition(Bytes.toBytes("09")))
+ assert(5 == partitioner.getPartition(Bytes.toBytes("10")))
+ assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
+ assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
+ assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
+
+ }
+
+
+}