You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/02/14 15:56:38 UTC

[carbondata] branch master updated: [CARBONDATA-4322] Apply local sort task level property for insert

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 59f23c0  [CARBONDATA-4322] Apply local sort task level property for insert
59f23c0 is described below

commit 59f23c0dfea74199f97b6d3626c31d8fba4a2e1f
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Mon Jan 24 18:07:19 2022 +0530

    [CARBONDATA-4322] Apply local sort task level property for insert
    
    Why is this PR needed?
    Currently, When carbon.partition.data.on.tasklevel is enabled with
    local sort, the number of tasks launched for load will be based on
    node locality. But for insert command, the local sort task level
    property is not applied which is causing the number of tasks
    launched based on the input files.
    
    What changes were proposed in this PR?
    Included changes to apply carbon.partition.data.on.tasklevel property
    for insert command as well. Used DataLoadCoalescedRDD to coalesce
    the partitions and a DataLoadCoalescedUnwrapRDDto unwrap partitions
    from DataLoadPartitionWrap and iterate.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4248
---
 .../command/management/CommonLoadUtils.scala       | 75 ++++++++++++++++++++--
 .../StandardPartitionTableCompactionTestCase.scala | 54 ++++++++++++++++
 2 files changed, 124 insertions(+), 5 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 6996b35..bdb3054 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -23,10 +23,12 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, DataLoadPartitionWrap, RDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
@@ -36,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.UpdateTableModel
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
+import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -896,6 +899,9 @@ object CommonLoadUtils {
       Array[String]()
     }
     var persistedRDD: Option[RDD[InternalRow]] = None
+    val partitionBasedOnLocality = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+        CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
     try {
       val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
                                    loadParams.scanResultRDD.isDefined) {
@@ -982,9 +988,26 @@ object CommonLoadUtils {
           persistedRDD = persistedRDDLocal
           transformedPlan
         } else {
+          val rdd = loadParams.scanResultRDD.get
+          val newRdd =
+            if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
+              val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
+                DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+              }.distinct.length
+              val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
+                nodeNumOfData,
+                loadParams.sparkSession.sqlContext.sparkContext)
+              val coalescedRdd = new DataLoadCoalescedRDD[InternalRow](
+                loadParams.sparkSession,
+                rdd,
+                nodes.toArray.distinct)
+              new DataLoadCoalescedUnwrapRDD(coalescedRdd)
+            } else {
+              rdd
+            }
           val (transformedPlan, partitions, persistedRDDLocal) =
             CommonLoadUtils.transformQueryWithInternalRow(
-              loadParams.scanResultRDD.get,
+              newRdd,
               loadParams.sparkSession,
               loadParams.carbonLoadModel,
               partitionValues,
@@ -999,9 +1022,6 @@ object CommonLoadUtils {
         }
       } else {
         val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
-        val partitionBasedOnLocality = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
-            CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT).toBoolean
         val rdd =
           if (sortScope == SortScopeOptions.SortScope.LOCAL_SORT && partitionBasedOnLocality) {
             CsvRDDHelper.csvFileScanRDDForLocalSort(
@@ -1208,3 +1228,48 @@ object CommonLoadUtils {
     segmentMetaDataInfo
   }
 }
+
+/**
+ *  It unwraps partitions from DataLoadPartitionWrap
+ */
+class DataLoadCoalescedUnwrapRDD[T: ClassTag](@transient var prev: RDD[T])
+  extends RDD[InternalRow](prev) {
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    new Iterator[InternalRow] {
+
+      final val partitionIterator = firstParent[DataLoadPartitionWrap[InternalRow]].iterator(split,
+        context)
+      private var rddIter: Iterator[InternalRow] = null
+
+      override def hasNext: Boolean = {
+        // If rddIter is already initialised, check and return
+        if (rddIter != null && rddIter.hasNext) {
+          true
+        } else {
+          internalHasNext()
+        }
+      }
+
+      def internalHasNext(): Boolean = {
+        if (partitionIterator.hasNext) {
+          val value = partitionIterator.next()
+          rddIter = value.rdd.iterator(value.partition, context)
+          var hasNext = rddIter.hasNext
+          // If iterator is finished then check for next iterator.
+          if (!hasNext) {
+            hasNext = internalHasNext()
+          }
+          hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): InternalRow = {
+        rddIter.next()
+      }
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = prev.partitions
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 2d00c69..24de7b9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -140,6 +140,59 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     }
   }
 
+  test("data insert and compaction for local sort partition table based on task id ") {
+    sql("drop table if exists partitionthree")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL, "TRUE")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
+    try {
+      sql("""
+            | CREATE TABLE partitionthree (empno int, doj Timestamp,
+            |  workgroupcategoryname String, deptno int, deptname String,
+            |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+            |  utilization int,salary int)
+            | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+            | STORED AS carbondata
+            | tblproperties('sort_scope'='local_sort', 'sort_columns'='deptname,empname')
+      """.stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+      assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+      sql("drop table if exists partitionthree_hive")
+      sql("CREATE TABLE partitionthree_hive (empno int, doj Timestamp, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) PARTITIONED BY (workgroupcategory int, empname String, designation String)")
+      sql("set hive.exec.dynamic.partition.mode=nonstrict")
+      sql("insert into partitionthree_hive select * from partitionthree")
+      sql("insert into partitionthree_hive select * from partitionthree")
+      sql("insert into partitionthree_hive select * from partitionthree")
+
+      sql("insert into partitionthree select * from partitionthree_hive")
+      assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 2)
+      sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
+      sql("clean files for table partitionthree options('force'='true')")
+      assert(getPartitionDataFilesCount("partitionthree", "/workgroupcategory=1/empname=arvind/designation=SE/") == 1)
+      checkExistence(sql("show segments for table partitionthree"), true, "0.1")
+      checkAnswer(sql(
+        "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization," +
+        " salary from partitionthree where workgroupcategory=1 and empname='arvind' and " +
+        "designation='SE' order by empno"),
+        sql(
+          "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, " +
+          "deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, " +
+          "utilization, salary from originTable where workgroupcategory=1 and empname='arvind' " +
+          "and designation='SE' order by empno"))
+    } finally {
+      sql("drop table if exists partitionthree")
+      sql("drop table if exists partitionthree_hive")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL,
+          CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL_DEFAULT)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+          CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+    }
+  }
+
   private def getPartitionDataFilesCount(tableName: String, partition: String) = {
     val table: CarbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
     FileFactory.getCarbonFile(table.getTablePath + partition)
@@ -280,6 +333,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     sql("drop table if exists partitionone")
     sql("drop table if exists partitiontwo")
     sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionthree_hive")
     sql("drop table if exists partitionmajor")
     sql("drop table if exists staticpartition")
     sql("drop table if exists staticpartitioncompaction")