You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/06/23 07:57:06 UTC

[carbondata] branch master updated: [CARBONDATA-4225] Fix Update performance issues when auto merge compaction is enabled

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d4ddd07  [CARBONDATA-4225] Fix Update performance issues when auto merge compaction is enabled
d4ddd07 is described below

commit d4ddd071c3df625305ff33af0a2c83ac57f405cd
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Jun 21 19:15:55 2021 +0530

    [CARBONDATA-4225] Fix Update performance issues when auto merge compaction is enabled
    
    Why is this PR needed?
    1. When auto-compaction is enabled, during update, we are trying to do compaction after
       Insert. Auto-Compaction throws exception, after multiple retries. Carbon does not allow
       concurrent compaction and Update.
    2. dataframe.rdd.isEmpty will launch a Job. This code is called two times in code, which
       is not reused.
    
    What changes were proposed in this PR?
    1. Avoid trying to do Auto-compaction during Update.
    2. Reuse dataframe.rdd.isEmpty and avoided launching a Job.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4156
---
 .../spark/rdd/CarbonDataRDDFactory.scala           | 20 +++++++++++++----
 .../command/management/CommonLoadUtils.scala       | 22 ++++++++++---------
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 25 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 14 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a6b8832..0449fba 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -312,9 +312,11 @@ object CarbonDataRDDFactory {
     val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
       CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
 
+    // dataFrame.get.rdd.isEmpty() will launch a job, so avoid calling it multiple times
+    val isEmptyDataframe = updateModel.isDefined && dataFrame.get.rdd.isEmpty()
     try {
       if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
-        if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+        if (isEmptyDataframe) {
           // if the rowToBeUpdated is empty, mark created segment as marked for delete and return
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, "")
         } else {
@@ -473,7 +475,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           throw new Exception(status(0)._2._2.errorMsg)
         }
-        if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+        if (isEmptyDataframe) {
           return null
         }
         // as no record loaded in new segment, new segment should be deleted
@@ -556,7 +558,11 @@ object CarbonDataRDDFactory {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
       if (isLoadingCommitted) {
-        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+        triggerEventsAfterLoading(sqlContext,
+          carbonLoadModel,
+          hadoopConf,
+          operationContext,
+          updateModel.isDefined)
       }
     }
   }
@@ -565,13 +571,19 @@ object CarbonDataRDDFactory {
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       hadoopConf: Configuration,
-      operationContext: OperationContext): Unit = {
+      operationContext: OperationContext,
+      isUpdateOperation: Boolean): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     // code to handle Pre-Priming cache for loading
     if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
       DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
         operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
     }
+    if (isUpdateOperation) {
+      // During Update, we cannot perform compaction, as concurrent update and compaction is not
+      // allowed. If Update flow, then just return
+      return
+    }
     try {
       // compaction handling
       if (carbonTable.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index e3b5ac0..fc19a9b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1101,16 +1101,18 @@ object CommonLoadUtils {
     }
     try {
       val compactedSegments = new util.ArrayList[String]()
-      // Trigger auto compaction
-      CarbonDataRDDFactory.handleSegmentMerging(
-        loadParams.sparkSession.sqlContext,
-        loadParams.carbonLoadModel
-          .getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
-            loadParams.carbonLoadModel.getCsvDelimiter),
-        table,
-        compactedSegments,
-        loadParams.operationContext)
-      loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
+      if (loadParams.updateModel.isEmpty) {
+        // Trigger auto compaction
+        CarbonDataRDDFactory.handleSegmentMerging(
+          loadParams.sparkSession.sqlContext,
+          loadParams.carbonLoadModel
+            .getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
+              loadParams.carbonLoadModel.getCsvDelimiter),
+          table,
+          compactedSegments,
+          loadParams.operationContext)
+        loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
+      }
     } catch {
       case e: Exception =>
         LOGGER.error(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 09fd237..fdd50e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.CarbonLockUtil
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -1192,6 +1193,30 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
      Seq(Row("abc", 1, "aa", "aaa")))
   }
 
+  test("test update on table with auto-merge enabled") {
+    sql("""drop table if exists iud.autoMergeUpdate""").collect()
+    sql("""create table iud.autoMergeUpdate (c1 string,c2 int,c3 string,c5 string) STORED AS
+        |carbondata tblproperties('auto_load_merge'='true')""".stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.autoMergeUpdate""")
+    sql("update iud.autoMergeUpdate up_TAble set(up_table.C1)=('abc')")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.autoMergeUpdate""")
+    val retryCount = CarbonLockUtil.getLockProperty(
+      CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
+      CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT)
+    val retryTimeOut = CarbonLockUtil.getLockProperty(
+      CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK,
+      CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT)
+    val updateTime = System.currentTimeMillis()
+    sql("update iud.autoMergeUpdate up_TAble set(up_table.C1)=('abcd')")
+    assert((System.currentTimeMillis() - updateTime) < (retryCount * retryTimeOut * 1000L))
+    checkAnswer(sql("select * from  iud.autoMergeUpdate"),
+      Seq(Row("abcd", 1, "aa", "aaa"), Row("abcd", 2, "bb", "bbb"),
+        Row("abcd", 3, "cc", "ccc"), Row("abcd", 4, "dd", "ddd"),
+        Row("abcd", 5, "ee", "eee"), Row("abcd", 1, "aa", "aaa"),
+        Row("abcd", 2, "bb", "bbb"), Row("abcd", 3, "cc", "ccc"),
+        Row("abcd", 4, "dd", "ddd"), Row("abcd", 5, "ee", "eee")))
+  }
+
   test("test update atomicity when horizontal compaction fails") {
     sql("drop table if exists iud.zerorows")
     sql("create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")