You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/17 20:55:09 UTC
[30/50] [abbrv] hbase git commit: HBASE-17905: [hbase-spark] bulkload
does not work when table not exist
HBASE-17905: [hbase-spark] bulkload does not work when table not exist
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/22f602ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/22f602ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/22f602ca
Branch: refs/heads/hbase-12439
Commit: 22f602cab5e9739a650fc962f4b08a0ccc51a972
Parents: 0b5bd78
Author: Yi Liang <ea...@gmail.com>
Authored: Tue Apr 11 15:30:13 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 11 17:01:07 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/spark/BulkLoadPartitioner.scala | 13 ++++++++-----
.../apache/hadoop/hbase/spark/HBaseContext.scala | 18 +++++++++++++++++-
2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f602ca/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
index ab4fc41..022c933 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
@@ -33,8 +33,8 @@ import org.apache.spark.Partitioner
@InterfaceAudience.Public
class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
extends Partitioner {
-
- override def numPartitions: Int = startKeys.length
+ // when table not exist, startKeys = Byte[0][]
+ override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length
override def getPartition(key: Any): Int = {
@@ -53,8 +53,11 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
case _ =>
key.asInstanceOf[Array[Byte]]
}
- val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
- if (partition < 0) partition * -1 + -2
- else partition
+ var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+ if (partition < 0)
+ partition = partition * -1 + -2
+ if (partition < 0)
+ partition = 0
+ partition
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/22f602ca/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index e2891db..8c4e0f4 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream
import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import scala.collection.mutable
/**
@@ -620,9 +620,17 @@ class HBaseContext(@transient sc: SparkContext,
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
+ val stagingPath = new Path(stagingDir)
+ val fs = stagingPath.getFileSystem(config)
+ if (fs.exists(stagingPath)) {
+ throw new FileAlreadyExistsException("Path " + stagingDir + " already exist")
+ }
val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
+ if (startKeys.length == 0) {
+ logInfo("Table " + tableName.toString + " was not found")
+ }
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl
@@ -743,9 +751,17 @@ class HBaseContext(@transient sc: SparkContext,
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
+ val stagingPath = new Path(stagingDir)
+ val fs = stagingPath.getFileSystem(config)
+ if (fs.exists(stagingPath)) {
+ throw new FileAlreadyExistsException("Path " + stagingDir + " already exist")
+ }
val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
+ if (startKeys.length == 0) {
+ logInfo("Table " + tableName.toString + " was not found")
+ }
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl