You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/04/12 00:18:54 UTC

hbase git commit: HBASE-17905 [hbase-spark] bulkload does not work when table not exist

Repository: hbase
Updated Branches:
  refs/heads/master 02da5a610 -> d7ddc7919


HBASE-17905 [hbase-spark] bulkload does not work when table not exist

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d7ddc791
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d7ddc791
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d7ddc791

Branch: refs/heads/master
Commit: d7ddc79198679d8c642e7d8ad5141ba518f8d9f3
Parents: 02da5a6
Author: Yi Liang <ea...@gmail.com>
Authored: Tue Apr 11 17:04:40 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Apr 11 17:18:49 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/spark/BulkLoadPartitioner.scala  | 13 ++++++++-----
 .../apache/hadoop/hbase/spark/HBaseContext.scala  | 18 +++++++++++++++++-
 2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d7ddc791/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
index ab4fc41..022c933 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
@@ -33,8 +33,8 @@ import org.apache.spark.Partitioner
 @InterfaceAudience.Public
 class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
   extends Partitioner {
-
-  override def numPartitions: Int = startKeys.length
+  // when table not exist, startKeys = Byte[0][]
+  override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length
 
   override def getPartition(key: Any): Int = {
 
@@ -53,8 +53,11 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
         case _ =>
           key.asInstanceOf[Array[Byte]]
       }
-    val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
-    if (partition < 0) partition * -1 + -2
-    else partition
+    var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+    if (partition < 0)
+      partition = partition * -1 + -2
+    if (partition < 0)
+      partition = 0
+    partition
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7ddc791/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
index e2891db..1948bd3 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala
@@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream
 import java.io._
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
 import scala.collection.mutable
 
 /**
@@ -620,9 +620,17 @@ class HBaseContext(@transient sc: SparkContext,
                   compactionExclude: Boolean = false,
                   maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
   Unit = {
+    val stagingPath = new Path(stagingDir)
+    val fs = stagingPath.getFileSystem(config)
+    if (fs.exists(stagingPath)) {
+      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
+    }
     val conn = HBaseConnectionCache.getConnection(config)
     val regionLocator = conn.getRegionLocator(tableName)
     val startKeys = regionLocator.getStartKeys
+    if (startKeys.length == 0) {
+      logInfo("Table " + tableName.toString + " was not found")
+    }
     val defaultCompressionStr = config.get("hfile.compression",
       Compression.Algorithm.NONE.getName)
     val hfileCompression = HFileWriterImpl
@@ -743,9 +751,17 @@ class HBaseContext(@transient sc: SparkContext,
                   compactionExclude: Boolean = false,
                   maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
   Unit = {
+    val stagingPath = new Path(stagingDir)
+    val fs = stagingPath.getFileSystem(config)
+    if (fs.exists(stagingPath)) {
+      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
+    }
     val conn = HBaseConnectionCache.getConnection(config)
     val regionLocator = conn.getRegionLocator(tableName)
     val startKeys = regionLocator.getStartKeys
+    if (startKeys.length == 0) {
+      logInfo("Table " + tableName.toString + " was not found")
+    }
     val defaultCompressionStr = config.get("hfile.compression",
       Compression.Algorithm.NONE.getName)
     val defaultCompression = HFileWriterImpl