You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/02/15 03:42:21 UTC

[GitHub] [carbondata] marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622
 
 
    ### Why is this PR needed?
    Now, Cleaning temp index files in the mergeindex flow takes a lot of time, sometimes it will take 2~3 mins, which should be optimized
   
    ### What changes were proposed in this PR?
   Clean temp index files in parallel in merge index flow
       
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - No
   
       
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r379984005
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
 
 Review comment:
   why not use `deleteAllCarbonFilesOfDir` ?
   
   Because line 175: you have to loop again to delete the directories. May be slow on s3

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   This can be implemented in RDD also

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586551417
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/299/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] marchpure commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
marchpure commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586568016
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525176
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/"
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416741
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
 
 Review comment:
   make it private

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586575134
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2009/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416668
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
 
 Review comment:
   suggest to add a threshold check, use RDD only when `allTmpFiles.length` exceed threshold

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586569602
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/306/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380415731
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
 
 Review comment:
   maybe you can use flatMap here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416759
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
+      retryTimes -= 1
+    }
+  }
+
+  /**
+   * delete the file
+   */
+  def deleteFile(filePath: String): Boolean = {
 
 Review comment:
   make it private

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586554100
 
 
   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2003/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525290
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416855
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
+      retryTimes -= 1
+    }
+  }
+
+  /**
+   * delete the file
+   */
+  def deleteFile(filePath: String): Boolean = {
+    val success = FileFactory.deleteFile(filePath)
+    if (!success && FileFactory.isFileExist(filePath)) {
 
 Review comment:
   only `if (!success)` is not enough?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   This can be implemented in RDD also, if exceed the threshold 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services