You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/02/19 12:44:53 UTC

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

Indhumathi27 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r579104989



##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -129,8 +127,13 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
     String partitionTempPath = "";
     for (String partition : partitionInfo) {
       if (partitionPath.equalsIgnoreCase(partition)) {
-        partitionTempPath = partition + "/" + tempFolderPath;
-        break;
+        if (tempFolderPath != null) {
+          partitionTempPath = partition + "/" + tempFolderPath;
+          break;
+        } else {
+          fileStore.readAllIIndexOfSegment(partition);

Review comment:
       Please add comments why this change is required

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -290,6 +290,7 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+        boolean isUpdateRequired = false;

Review comment:
       please add comments

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
##########
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
                   .asScala
                 val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
                 validSegments.foreach { segment =>
-                  validSegmentIds += segment.getSegmentNo
+                  val segmentFile = segment.getSegmentFileName
+                  val sfs = new SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)

Review comment:
       same comment as above

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,73 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(indexFile);

Review comment:
       Please check, if in query flow, you are reading merge file only once, if only one valid merge index file is present

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -140,19 +141,47 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                   .get
                   .filterNot(streamingSegment.contains(_))
               }
+            validSegments.foreach { segment =>
+              if (segmentsToMerge.contains(segment.getSegmentNo)) {
+                val segmentFile = segment.getSegmentFileName
+                val sfs = new SegmentFileStore(carbonMainTable.getTablePath, segmentFile)

Review comment:
       looks like, segment file is read twice. Please check and handle

##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -185,13 +193,23 @@ object CarbonMergeFilesRDD {
         val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
                                segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+        val uuid = String.valueOf(System.currentTimeMillis)
+        val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
         // Merge all partition files into a single file.
-        val segmentFileName: String = SegmentFileStore
-          .genSegmentFileName(segmentId, segmentFileNameToSegmentIdMap.get(segmentId))
-        SegmentFileStore
+        val segmentFile = SegmentFileStore
           .mergeSegmentFiles(readPath,
-            segmentFileName,
+            newSegmentFileName,
             CarbonTablePath.getSegmentFilesLocation(tablePath))
+        if (segmentFile != null) {
+          val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+            CarbonTablePath.SEGMENT_EXT)
+          val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,

Review comment:
       please check, in which flow this is called and check if caller does not update table status file again

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,73 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(indexFile);
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =

Review comment:
       1. If no index files are present, then no need to read merge index files.
   2. If index files are present and more than one merge-index file is present, then find which one is valid and then read only that one.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org