You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/09/16 14:34:43 UTC

[carbondata] branch master updated: [CARBONDATA-3989] Fix unnecessary segment files creation when segment is neither updated nor deleted.

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d85bcb5  [CARBONDATA-3989] Fix unnecessary segment files creation when segment is neither updated nor deleted.
d85bcb5 is described below

commit d85bcb51da9190a613828bd1bb047ed4548a0dc3
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon Sep 7 15:29:32 2020 +0530

    [CARBONDATA-3989] Fix unnecessary segment files creation when segment is neither updated nor deleted.
    
    Why is this PR needed?
    Unnecessary segment file is created for the segment, even though the segment is neither updated nor deleted.
    Once the segment files are merged, old segment files are still present and will keep on increasing until clean files is fired.
    
    What changes were proposed in this PR?
    Create the new segment files only when the segment is updated or deleted data.
    once the segment file merge is done, delete the old segment files to avoid many segment files in the metadata directory.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3927
---
 .../spark/rdd/CarbonDataRDDFactory.scala           | 74 ++++++++++++----------
 1 file changed, 42 insertions(+), 32 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index fc3f578..6d32dae 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -54,7 +54,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonUtil, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
@@ -731,43 +731,53 @@ object CarbonDataRDDFactory {
     val metadataDetails =
       SegmentStatusManager.readTableStatusFile(
         CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
-    val segmentFiles = segmentDetails.asScala.map { seg =>
-      val load =
-        metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get
-      val segmentFile = load.getSegmentFile
-      var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+    val updateTableStatusFile = CarbonUpdateUtil.getUpdateStatusFileName(updateModel
+      .updatedTimeStamp.toString)
+    val updatedSegments = SegmentUpdateStatusManager.readLoadMetadata(updateTableStatusFile,
+      carbonTable.getTablePath).map(_.getSegmentName).toSet
+    val segmentFiles = segmentDetails.asScala.map { segment =>
+      // create new segment files and merge for only updated segments
+      if (updatedSegments.contains(segment.getSegmentNo)) {
+        val load =
+          metadataDetails.find(_.getLoadName.equals(segment.getSegmentNo)).get
+        val segmentFile = load.getSegmentFile
+        var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
 
-      val segmentMetaDataInfo = segmentMetaDataInfoMap.get(seg.getSegmentNo)
-      val file = SegmentFileStore.writeSegmentFile(
-        carbonTable,
-        seg.getSegmentNo,
-        String.valueOf(System.currentTimeMillis()),
-        load.getPath,
-        segmentMetaDataInfo)
+        val segmentMetaDataInfo = segmentMetaDataInfoMap.get(segment.getSegmentNo)
+        val segmentFileName = SegmentFileStore.writeSegmentFile(
+          carbonTable,
+          segment.getSegmentNo,
+          String.valueOf(System.currentTimeMillis()),
+          load.getPath,
+          segmentMetaDataInfo)
 
-      if (segmentFile != null) {
-        segmentFiles ++= FileFactory.getCarbonFile(
+        if (segmentFile != null) segmentFiles ++= FileFactory.getCarbonFile(
           SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
-      }
-      val updatedSegFile = if (file != null) {
-        val carbonFile = FileFactory.getCarbonFile(
-          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
-        segmentFiles ++= carbonFile :: Nil
+        val updatedSegFile = if (segmentFileName != null) {
+          val segmentCarbonFile = FileFactory.getCarbonFile(
+            SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFileName))
+          segmentFiles ++= segmentCarbonFile :: Nil
 
-        val mergedSegFileName = SegmentFileStore.genSegmentFileName(
-          seg.getSegmentNo,
-          updateModel.updatedTimeStamp.toString)
-        SegmentFileStore.mergeSegmentFiles(
-          mergedSegFileName,
-          CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
-          segmentFiles.toArray)
-        carbonFile.delete()
-        mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+          val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+            segment.getSegmentNo,
+            updateModel.updatedTimeStamp.toString)
+          SegmentFileStore.mergeSegmentFiles(
+            mergedSegFileName,
+            CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+            segmentFiles.toArray)
+          segmentFiles.foreach { oldSegmentFile =>
+            oldSegmentFile.delete()
+            LOGGER.debug(s"Old segment file is deleted after segment file merge: ${
+              oldSegmentFile.getName
+            }")
+          }
+          mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+        } else null
+
+        new Segment(segment.getSegmentNo, updatedSegFile)
       } else {
-        null
+        segment
       }
-
-      new Segment(seg.getSegmentNo, updatedSegFile)
     }.filter(_.getSegmentFileName != null).asJava
     segmentFiles
   }