You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/27 03:12:29 UTC

[GitHub] [incubator-seatunnel] Carl-Zhou-CN commented on a diff in pull request #3169: [Bug][Connector-V1-Spark-Hbase] When the written df is empty, the directory does not exist when the load file is loaded

Carl-Zhou-CN commented on code in PR #3169:
URL: https://github.com/apache/incubator-seatunnel/pull/3169#discussion_r1006364175


##########
seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala:
##########
@@ -132,31 +133,27 @@ class Hbase extends SparkBatchSink with Logging {
         },
         stagingDir)
 
-      val load = new LoadIncrementalHFiles(hbaseConf)
-      val table = hbaseConn.getTable(tableName)
-      load.doBulkLoad(
-        new Path(stagingDir),
-        hbaseConn.getAdmin,
-        table,
-        hbaseConn.getRegionLocator(tableName))
+      if (fs.exists(stagingPath)) {
+        val load = new LoadIncrementalHFiles(hbaseConf)
+        val table = hbaseConn.getTable(tableName)
+        load.doBulkLoad(
+          stagingPath,
+          hbaseConn.getAdmin,
+          table,
+          hbaseConn.getRegionLocator(tableName))
+      }
 
     } finally {
       if (hbaseConn != null) {
         hbaseConn.close()
       }
-
-      cleanUpStagingDir(stagingDir)
+      cleanUpStagingDir(stagingPath, fs)
     }
   }
 
-  private def cleanUpStagingDir(stagingDir: String): Unit = {
-    val stagingPath = new Path(stagingDir)
-    val fs = stagingPath.getFileSystem(hbaseContext.config)
+  private def cleanUpStagingDir(stagingPath: Path, fs: FileSystem): Unit = {
     if (!fs.delete(stagingPath, true)) {
-      logWarning(s"clean staging dir $stagingDir failed")
-    }
-    if (fs != null) {
-      fs.close()

Review Comment:
   Because it cannot be closed at this time, closing will cause the fs object held by Spark to be closed, resulting in an abnormal exit of SparkContext



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org