You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/02/02 11:32:41 UTC

[GitHub] [carbondata] kunal642 commented on a change in pull request #4070: [CARBONDATA-4082] Fix alter table add segment query on adding a segment having delete delta files.

kunal642 commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r568530696



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
       OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
     }
 
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+      .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+    if (deltaFiles.length > 0) {
+      val blockNameToDeltaFilesMap =
+        collection.mutable.Map[String, collection.mutable.ListBuffer[(CarbonFile, String)]]()
+      deltaFiles.foreach { deltaFile =>
+        val tmpDeltaFilePath = deltaFile.getAbsolutePath
+          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+        val deltaFilePathElements = tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+          val deltaFileName = deltaFilePathElements(deltaFilePathElements.length - 1)
+          val blockName = CarbonTablePath.DataFileUtil
+            .getBlockNameFromDeleteDeltaFile(deltaFileName)
+          if (blockNameToDeltaFilesMap.contains(blockName)) {
+            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+          } else {
+            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+            deltaFileList += ((deltaFile, deltaFileName))
+            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+          }
+        }
+      }
+      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+      val columnCompressor = CompressorFactory.getInstance.getCompressor.getName
+      blockNameToDeltaFilesMap.foreach { entry =>
+        val segmentUpdateDetail = new SegmentUpdateDetails()
+        segmentUpdateDetail.setBlockName(entry._1)
+        segmentUpdateDetail.setActualBlockName(
+          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+            CarbonCommonConstants.FACT_FILE_EXT)
+        segmentUpdateDetail.setSegmentName(model.getSegmentId)
+        setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+        segmentUpdateDetails.add(segmentUpdateDetail)
+      }
+      val timestamp = System.currentTimeMillis().toString
+      val segmentDetails = new util.HashSet[Segment]()
+      segmentDetails.add(model.getSegment)
+      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable, timestamp, false)

Review comment:
       can we pass a check like forcewrite in the updateSegmentStatus to avoid the validation of the segment from tablestaus file.. this flag would be true in addload command when delete delta is present. This way you can avoid writing twice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org