You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by li...@apache.org on 2020/06/30 02:33:12 UTC

[carbondata] branch master updated: [CARBONDATA-3877] Reduce read tablestatus overhead during inserting into partition table

This is an automated email from the ASF dual-hosted git repository.

liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d316fc  [CARBONDATA-3877] Reduce read tablestatus overhead during inserting into partition table
1d316fc is described below

commit 1d316fc0ee3af9ecfda7208379051905b8460dde
Author: haomarch <ma...@126.com>
AuthorDate: Sun Jun 28 19:21:19 2020 +0800

    [CARBONDATA-3877] Reduce read tablestatus overhead during inserting into partition table
    
    Why is this PR needed?
    Currently during inserting into a partition table, there are a lot of tablestauts read operations, but when storing table status file in object store, reading of table status file may fail (receive IOException or JsonSyntaxException) when table status file is being modifying, which leading to High failure rate when concurrent insert into a partition table.
    
    What changes were proposed in this PR?
    (1) Three codes was removed:calcute sizeinbytes, clean segments, deleteLoadsAndUpdateMetadata
    'calcute sizeinbytes' is useless during inserting into flow. 'clean segments' and 'deleteLoadsAndUpdateMetadata' are supported by 'clean files' command, which can be removed from inserting into flow.
    (2) Reduce duplicate tablestatus operations and limit the conditions to get tablestatus.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3800
---
 .../carbondata/core/metadata/SegmentFileStore.java  | 17 +++++++++++++++++
 .../hadoop/api/CarbonOutputCommitter.java           | 11 +++++++----
 .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala  |  7 ++++---
 .../management/CarbonInsertIntoCommand.scala        |  2 --
 .../command/management/CommonLoadUtils.scala        |  5 +----
 .../datasources/SparkCarbonTableFormat.scala        | 21 ++++++++++++++++-----
 .../SILoadEventListenerForFailedSegments.scala      |  3 ++-
 .../loading/TableProcessingOperations.java          |  3 +--
 8 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 335e0f5..2f274c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1149,6 +1149,23 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath,
+      String segmentFilePath, String loadStartTime) throws IOException {
+    SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFilePath);
+    List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+    for (PartitionSpec spec : partitionSpecs) {
+      spec.setUuid(segmentId + "_" + loadStartTime);
+    }
+    return partitionSpecs;
+  }
+
+  /**
+   * Get the partition specs of the segment
+   * @param segmentId
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath,
       LoadMetadataDetails[] details)
       throws IOException {
     LoadMetadataDetails segEntry = null;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 02c8d4c..4b8fc43 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -232,10 +232,13 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     String segmentsToBeDeleted =
         context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
     List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
-    Set<Segment> segmentSet = new HashSet<>(
-        new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
-            context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
-            .getValidSegments());
+    Set<Segment> segmentSet = new HashSet<>();
+    if (updateTime != null || uniqueId != null) {
+      segmentSet = new HashSet<>(
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+              context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
+                  .getValidSegments());
+    }
     if (updateTime != null) {
       CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
           segmentDeleteList);
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index ebac5e4..695ee27 100644
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -229,14 +229,15 @@ class CarbonMergeFilesRDD(
 
   override def internalGetPartitions: Array[Partition] = {
     if (isHivePartitionedTable) {
-      val metadataDetails = SegmentStatusManager
-        .readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
       // in case of partition table make rdd partitions per partition of the carbon table
       val partitionPaths: java.util.Map[String, java.util.List[String]] = new java.util.HashMap()
       if (partitionInfo == null || partitionInfo.isEmpty) {
         segments.foreach(segment => {
+          val loadStartTime = segmentFileNameToSegmentIdMap.get(segment)
+          val segmentFileName = SegmentFileStore.genSegmentFileName(
+            segment, loadStartTime) + CarbonTablePath.SEGMENT_EXT
           val partitionSpecs = SegmentFileStore
-            .getPartitionSpecs(segment, carbonTable.getTablePath, metadataDetails)
+            .getPartitionSpecs(segment, carbonTable.getTablePath, segmentFileName, loadStartTime)
             .asScala.map(_.getLocation.toString)
           partitionPaths.put(segment, partitionSpecs.asJava)
         })
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 8dfad76..8c14917 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -202,8 +202,6 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
           updateModel = None,
           operationContext = operationContext)
 
-      // Clean up the old invalid segment data before creating a new entry for new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
       // add the start entry for the new load in the table status file
       if (!table.isHivePartitionTable) {
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 20d29d8..71649b8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -126,7 +126,6 @@ object CommonLoadUtils {
             TableIdentifier(tableName, databaseNameOp))).collect {
           case l: LogicalRelation => l
         }.head
-      sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
       finalPartition = getCompletePartitionValues(partition, table)
     }
     (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition)
@@ -843,8 +842,6 @@ object CommonLoadUtils {
   def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
     val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get
-    // Clean up the already dropped partitioned data
-    SegmentFileStore.cleanSegments(table, null, false)
     CarbonUtils.threadSet("partition.operationcontext", loadParams.operationContext)
     val attributes = if (loadParams.scanResultRDD.isDefined) {
       // take the already re-arranged attributes
@@ -1102,7 +1099,7 @@ object CommonLoadUtils {
     val specs =
       SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
         loadParams.carbonLoadModel.getTablePath,
-        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath)))
+        loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
     if (specs != null) {
       specs.asScala.map { spec =>
         Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index f1f0b80..225daf5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -159,13 +159,16 @@ with Serializable {
     if (currEntry != null) {
       val loadEntry =
         ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
-      val details =
-        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
       model.setSegmentId(loadEntry.getLoadName)
       model.setFactTimeStamp(loadEntry.getLoadStartTime)
-      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
-      list.add(loadEntry)
-      model.setLoadMetadataDetails(list)
+      if (!isLoadDetailsContainTheCurrentEntry(
+        model.getLoadMetadataDetails.asScala.toArray, loadEntry)) {
+        val details =
+          SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+        val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+        list.add(loadEntry)
+        model.setLoadMetadataDetails(list)
+      }
     }
     // Set the update timestamp if user sets in case of update query. It needs to be updated
     // in load status update time
@@ -224,6 +227,14 @@ with Serializable {
     }
   }
   override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonTableFormat]
+
+  private def isLoadDetailsContainTheCurrentEntry(
+      loadDetails: Array[LoadMetadataDetails],
+      currentEntry: LoadMetadataDetails): Boolean = {
+    (loadDetails.length - 1 to 0).exists { index =>
+      loadDetails(index).getLoadName.equals(currentEntry.getLoadName)
+    }
+  }
 }
 
 case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2e6a441..2071385 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -64,7 +64,6 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
           .lookupRelation(Some(carbonLoadModel.getDatabaseName),
             carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
         val indexMetadata = carbonTable.getIndexMetadata
-        val mainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
         val secondaryIndexProvider = IndexType.SI.getIndexProviderName
         if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
             null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
@@ -72,6 +71,8 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
             .get(secondaryIndexProvider).keySet().asScala
           // if there are no index tables for a given fact table do not perform any action
           if (indexTables.nonEmpty) {
+            val mainTableDetails =
+              SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
             indexTables.foreach {
               indexTableName =>
                 val isLoadSIForFailedSegments = sparkSession.sessionState.catalog
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index 358295d..f0f14d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -54,11 +54,10 @@ public class TableProcessingOperations {
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-
     //delete folder which metadata no exist in tablestatus
     String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
     if (FileFactory.isFileExist(partitionPath)) {
+      final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
       CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
       CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
         @Override