You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/06/23 07:57:06 UTC
[carbondata] branch master updated: [CARBONDATA-4225] Fix Update
performance issues when auto merge compaction is enabled
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d4ddd07 [CARBONDATA-4225] Fix Update performance issues when auto merge compaction is enabled
d4ddd07 is described below
commit d4ddd071c3df625305ff33af0a2c83ac57f405cd
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Mon Jun 21 19:15:55 2021 +0530
[CARBONDATA-4225] Fix Update performance issues when auto merge compaction is enabled
Why is this PR needed?
1. When auto-compaction is enabled, during update, we are trying to do compaction after
Insert. Auto-Compaction throws exception, after multiple retries. Carbon does not allow
concurrent compaction and Update.
2. dataframe.rdd.isEmpty will launch a Job. This code is called two times in code, which
is not reused.
What changes were proposed in this PR?
1. Avoid trying to do Auto-compaction during Update.
2. Reuse dataframe.rdd.isEmpty and avoided launching a Job.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4156
---
.../spark/rdd/CarbonDataRDDFactory.scala | 20 +++++++++++++----
.../command/management/CommonLoadUtils.scala | 22 ++++++++++---------
.../testsuite/iud/UpdateCarbonTableTestCase.scala | 25 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 14 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a6b8832..0449fba 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -312,9 +312,11 @@ object CarbonDataRDDFactory {
val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
+ // dataFrame.get.rdd.isEmpty() will launch a job, so avoid calling it multiple times
+ val isEmptyDataframe = updateModel.isDefined && dataFrame.get.rdd.isEmpty()
try {
if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
- if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+ if (isEmptyDataframe) {
// if the rowToBeUpdated is empty, mark created segment as marked for delete and return
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, "")
} else {
@@ -473,7 +475,7 @@ object CarbonDataRDDFactory {
LOGGER.info("********clean up done**********")
throw new Exception(status(0)._2._2.errorMsg)
}
- if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+ if (isEmptyDataframe) {
return null
}
// as no record loaded in new segment, new segment should be deleted
@@ -556,7 +558,11 @@ object CarbonDataRDDFactory {
// Release the segment lock, once table status is finally updated
segmentLock.unlock()
if (isLoadingCommitted) {
- triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+ triggerEventsAfterLoading(sqlContext,
+ carbonLoadModel,
+ hadoopConf,
+ operationContext,
+ updateModel.isDefined)
}
}
}
@@ -565,13 +571,19 @@ object CarbonDataRDDFactory {
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration,
- operationContext: OperationContext): Unit = {
+ operationContext: OperationContext,
+ isUpdateOperation: Boolean): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
// code to handle Pre-Priming cache for loading
if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
}
+ if (isUpdateOperation) {
+ // During Update, we cannot perform compaction, as concurrent update and compaction is not
+ // allowed. If Update flow, then just return
+ return
+ }
try {
// compaction handling
if (carbonTable.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index e3b5ac0..fc19a9b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1101,16 +1101,18 @@ object CommonLoadUtils {
}
try {
val compactedSegments = new util.ArrayList[String]()
- // Trigger auto compaction
- CarbonDataRDDFactory.handleSegmentMerging(
- loadParams.sparkSession.sqlContext,
- loadParams.carbonLoadModel
- .getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
- loadParams.carbonLoadModel.getCsvDelimiter),
- table,
- compactedSegments,
- loadParams.operationContext)
- loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
+ if (loadParams.updateModel.isEmpty) {
+ // Trigger auto compaction
+ CarbonDataRDDFactory.handleSegmentMerging(
+ loadParams.sparkSession.sqlContext,
+ loadParams.carbonLoadModel
+ .getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
+ loadParams.carbonLoadModel.getCsvDelimiter),
+ table,
+ compactedSegments,
+ loadParams.operationContext)
+ loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
+ }
} catch {
case e: Exception =>
LOGGER.error(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 09fd237..fdd50e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.CarbonLockUtil
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -1192,6 +1193,30 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
Seq(Row("abc", 1, "aa", "aaa")))
}
+ test("test update on table with auto-merge enabled") {
+ sql("""drop table if exists iud.autoMergeUpdate""").collect()
+ sql("""create table iud.autoMergeUpdate (c1 string,c2 int,c3 string,c5 string) STORED AS
+ |carbondata tblproperties('auto_load_merge'='true')""".stripMargin)
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.autoMergeUpdate""")
+ sql("update iud.autoMergeUpdate up_TAble set(up_table.C1)=('abc')")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.autoMergeUpdate""")
+ val retryCount = CarbonLockUtil.getLockProperty(
+ CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
+ CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT)
+ val retryTimeOut = CarbonLockUtil.getLockProperty(
+ CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK,
+ CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT)
+ val updateTime = System.currentTimeMillis()
+ sql("update iud.autoMergeUpdate up_TAble set(up_table.C1)=('abcd')")
+ assert((System.currentTimeMillis() - updateTime) < (retryCount * retryTimeOut * 1000L))
+ checkAnswer(sql("select * from iud.autoMergeUpdate"),
+ Seq(Row("abcd", 1, "aa", "aaa"), Row("abcd", 2, "bb", "bbb"),
+ Row("abcd", 3, "cc", "ccc"), Row("abcd", 4, "dd", "ddd"),
+ Row("abcd", 5, "ee", "eee"), Row("abcd", 1, "aa", "aaa"),
+ Row("abcd", 2, "bb", "bbb"), Row("abcd", 3, "cc", "ccc"),
+ Row("abcd", 4, "dd", "ddd"), Row("abcd", 5, "ee", "eee")))
+ }
+
test("test update atomicity when horizontal compaction fails") {
sql("drop table if exists iud.zerorows")
sql("create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")