You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/02/14 15:56:38 UTC
[carbondata] branch master updated: [CARBONDATA-4322] Apply local sort task level property for insert
This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 59f23c0 [CARBONDATA-4322] Apply local sort task level property for insert
59f23c0 is described below
commit 59f23c0dfea74199f97b6d3626c31d8fba4a2e1f
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Mon Jan 24 18:07:19 2022 +0530
[CARBONDATA-4322] Apply local sort task level property for insert
Why is this PR needed?
Currently, When carbon.partition.data.on.tasklevel is enabled with
local sort, the number of tasks launched for load will be based on
node locality. But for insert command, the local sort task level
property is not applied which is causing the number of tasks
launched based on the input files.
What changes were proposed in this PR?
Included changes to apply carbon.partition.data.on.tasklevel property
for insert command as well. Used DataLoadCoalescedRDD to coalesce
the partitions and a DataLoadCoalescedUnwrapRDDto unwrap partitions
from DataLoadPartitionWrap and iterate.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4248
---
.../command/management/CommonLoadUtils.scala | 75 ++++++++++++++++++++--
.../StandardPartitionTableCompactionTestCase.scala | 54 ++++++++++++++++
2 files changed, 124 insertions(+), 5 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 6996b35..bdb3054 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -23,10 +23,12 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, DataLoadPartitionWrap, RDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
@@ -36,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.UpdateTableModel
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
+import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
@@ -896,6 +899,9 @@ object CommonLoadUtils {
Array[String]()
}
var persistedRDD: Option[RDD[InternalRow]] = None
+ val partitionBasedOnLocality = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+ CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
try {
val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
loadParams.scanResultRDD.isDefined) {
@@ -982,9 +988,26 @@ object CommonLoadUtils {
persistedRDD = persistedRDDLocal
transformedPlan
} else {
+ val rdd = loadParams.scanResultRDD.get
+ val newRdd =
+ if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
+ val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
+ DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+ }.distinct.length
+ val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
+ nodeNumOfData,
+ loadParams.sparkSession.sqlContext.sparkContext)
+ val coalescedRdd = new DataLoadCoalescedRDD[InternalRow](
+ loadParams.sparkSession,
+ rdd,
+ nodes.toArray.distinct)
+ new DataLoadCoalescedUnwrapRDD(coalescedRdd)
+ } else {
+ rdd
+ }
val (transformedPlan, partitions, persistedRDDLocal) =
CommonLoadUtils.transformQueryWithInternalRow(
- loadParams.scanResultRDD.get,
+ newRdd,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
@@ -999,9 +1022,6 @@ object CommonLoadUtils {
}
} else {
val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
- val partitionBasedOnLocality = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
- CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
val rdd =
if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
CsvRDDHelper.csvFileScanRDDForLocalSort(
@@ -1208,3 +1228,48 @@ object CommonLoadUtils {
segmentMetaDataInfo
}
}
+
+/**
+ * It unwraps partitions from DataLoadPartitionWrap
+ */
+class DataLoadCoalescedUnwrapRDD[T: ClassTag](@transient var prev: RDD[T])
+ extends RDD[InternalRow](prev) {
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+
+ final val partitionIterator = firstParent[DataLoadPartitionWrap[InternalRow]].iterator(split,
+ context)
+ private var rddIter: Iterator[InternalRow] = null
+
+ override def hasNext: Boolean = {
+ // If rddIter is already initialised, check and return
+ if (rddIter != null && rddIter.hasNext) {
+ true
+ } else {
+ internalHasNext()
+ }
+ }
+
+ def internalHasNext(): Boolean = {
+ if (partitionIterator.hasNext) {
+ val value = partitionIterator.next()
+ rddIter = value.rdd.iterator(value.partition, context)
+ var hasNext = rddIter.hasNext
+ // If iterator is finished then check for next iterator.
+ if (!hasNext) {
+ hasNext = internalHasNext()
+ }
+ hasNext
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ rddIter.next()
+ }
+ }
+ }
+
+ override protected def getPartitions: Array[Partition] = prev.partitions
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 2d00c69..24de7b9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -140,6 +140,59 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
}
}
+ test("data insert and compaction for local sort partition table based on task id ") {
+ sql("drop table if exists partitionthree")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL, "TRUE")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
+ try {
+ sql("""
+ | CREATE TABLE partitionthree (empno int, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+ | STORED AS carbondata
+ | tblproperties('sort_scope'='local_sort', 'sort_columns'='deptname,empname')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+ sql("drop table if exists partitionthree_hive")
+ sql("CREATE TABLE partitionthree_hive (empno int, doj Timestamp, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) PARTITIONED BY (workgroupcategory int, empname String, designation String)")
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ sql("insert into partitionthree_hive select * from partitionthree")
+ sql("insert into partitionthree_hive select * from partitionthree")
+ sql("insert into partitionthree_hive select * from partitionthree")
+
+ sql("insert into partitionthree select * from partitionthree_hive")
+ assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 2)
+ sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
+ sql("clean files for table partitionthree options('force'='true')")
+ assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+ checkExistence(sql("show segments for table partitionthree"), true, "0.1")
+ checkAnswer(sql(
+ "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+ "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization," +
+ " salary from partitionthree where workgroupcategory=1 and empname='arvind' and " +
+ "designation='SE' order by empno"),
+ sql(
+ "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+ "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, " +
+ "utilization, salary from originTable where workgroupcategory=1 and empname='arvind' " +
+ "and designation='SE' order by empno"))
+ } finally {
+ sql("drop table if exists partitionthree")
+ sql("drop table if exists partitionthree_hive")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+ CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+ CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+ }
+ }
+
private def getPartitionDataFilesCount(tableName: String, partition: String) = {
val table: CarbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
FileFactory.getCarbonFile(table.getTablePath + partition)
@@ -280,6 +333,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
sql("drop table if exists partitionone")
sql("drop table if exists partitiontwo")
sql("drop table if exists partitionthree")
+ sql("drop table if exists partitionthree_hive")
sql("drop table if exists partitionmajor")
sql("drop table if exists staticpartition")
sql("drop table if exists staticpartitioncompaction")