You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/03/01 12:47:22 UTC

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r584683576



##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,73 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(indexFile);
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -290,6 +290,7 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+        boolean isUpdateRequired = false;

Review comment:
       Done

##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -185,13 +193,23 @@ object CarbonMergeFilesRDD {
         val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
                                segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+        val uuid = String.valueOf(System.currentTimeMillis)
+        val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
         // Merge all partition files into a single file.
-        val segmentFileName: String = SegmentFileStore
-          .genSegmentFileName(segmentId, segmentFileNameToSegmentIdMap.get(segmentId))
-        SegmentFileStore
+        val segmentFile = SegmentFileStore
           .mergeSegmentFiles(readPath,
-            segmentFileName,
+            newSegmentFileName,
             CarbonTablePath.getSegmentFilesLocation(tablePath))
+        if (segmentFile != null) {
+          val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+            CarbonTablePath.SEGMENT_EXT)
+          val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,

Review comment:
       this flow is called when mergeIndex on old tables. Update happens only once.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -140,19 +141,47 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                   .get
                   .filterNot(streamingSegment.contains(_))
               }
+            validSegments.foreach { segment =>
+              if (segmentsToMerge.contains(segment.getSegmentNo)) {
+                val segmentFile = segment.getSegmentFileName
+                val sfs = new SegmentFileStore(carbonMainTable.getTablePath, segmentFile)

Review comment:
       Done.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
##########
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
                   .asScala
                 val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
                 validSegments.foreach { segment =>
-                  validSegmentIds += segment.getSegmentNo
+                  val segmentFile = segment.getSegmentFileName
+                  val sfs = new SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)

Review comment:
       Done.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -129,8 +127,13 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
     String partitionTempPath = "";
     for (String partition : partitionInfo) {
       if (partitionPath.equalsIgnoreCase(partition)) {
-        partitionTempPath = partition + "/" + tempFolderPath;
-        break;
+        if (tempFolderPath != null) {
+          partitionTempPath = partition + "/" + tempFolderPath;
+          break;
+        } else {
+          fileStore.readAllIIndexOfSegment(partition);

Review comment:
       removed, edited to use `mergeIndexFilesInPartitionedSegment ` flow for alter add hive partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org