You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/05/19 07:22:19 UTC

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4130: [CARBONDATA-4183] Local sort Partition Load and Compaction fix

ajantha-bhat commented on a change in pull request #4130:
URL: https://github.com/apache/carbondata/pull/4130#discussion_r634921359



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -974,6 +974,16 @@ private CarbonCommonConstants() {
 
   public static final String LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT = "0";
 
+  /**
+   * When enabled, tasks launched for Local sort partition load will be based on one node one task.
+   * Compaction will be performed based on task level for a partition.

Review comment:
       please also add why it is required and how it impacts memory and performance in the description as well as in the document.

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
##########
@@ -94,6 +94,45 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"))
   }
 
+  test("data load and compaction for local sort partition table based on task id ") {
+    sql("drop table if exists partitionthree")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_PARTITION_DATA_BASED_ON_TASK_LEVEL, "TRUE")
+    try {
+      sql("""
+            | CREATE TABLE partitionthree (empno int, doj Timestamp,
+            |  workgroupcategoryname String, deptno int, deptname String,
+            |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+            |  utilization int,salary int)
+            | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+            | STORED AS carbondata
+            | tblproperties('sort_scope'='local_sort', 'sort_columns'='deptname,empname')
+      """.stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

Review comment:
       can add some validation points like a number of files count should be node count.
   I don't see any validation for the new feature added in the testcase. 

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
##########
@@ -712,10 +731,63 @@ class CarbonMergerRDD[K, V](
     }
   }
 
+  /**
+   * Group the data files based on task Id per partition
+   */
+  private def getTaskNumberBasedOnID(
+      split: CarbonInputSplit,
+      partitionTaskMap: util.Map[PartitionSpec, util.HashMap[String, Int]],
+      counter: AtomicInteger): String = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      val path = split.getPath.getParent
+      val partTask =
+        carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match {
+          case Some(part) => part
+          case None =>
+            throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
+        }
+      var task: String = null
+      // split task Id is a combination of segment id(3 digits), task id (6 digits) and
+      // partition number (6 digits). Get the task Id from split task id
+      val partitionTaskId = split.taskId.substring(3, 9).toInt - Math.pow(10, 5).toInt

Review comment:
       If someone changes task Id logic, this may go for a toss and no error will be reported. 
   May be write a Util method in `getTaskIdfromUniqueNumber` below `generateUniqueNumber` for better maintainability.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
##########
@@ -712,10 +731,63 @@ class CarbonMergerRDD[K, V](
     }
   }
 
+  /**
+   * Group the data files based on task Id per partition
+   */
+  private def getTaskNumberBasedOnID(
+      split: CarbonInputSplit,
+      partitionTaskMap: util.Map[PartitionSpec, util.HashMap[String, Int]],
+      counter: AtomicInteger): String = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      val path = split.getPath.getParent
+      val partTask =
+        carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match {
+          case Some(part) => part
+          case None =>
+            throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
+        }
+      var task: String = null
+      // split task Id is a combination of segment id(3 digits), task id (6 digits) and
+      // partition number (6 digits). Get the task Id from split task id
+      val partitionTaskId = split.taskId.substring(3, 9).toInt - Math.pow(10, 5).toInt
+      if (partitionTaskMap.isEmpty || partitionTaskMap.get(partTask) == null) {
+        if (partitionTaskMap.isEmpty) {
+          task = counter.toString
+        } else {
+          task = counter.incrementAndGet().toString
+        }
+        val partitionTaskList = new util.HashMap[String, Int]()
+        partitionTaskList.put(task, partitionTaskId)
+        partitionTaskMap.put(partTask, partitionTaskList)
+      } else {
+        val taskMap = partitionTaskMap.get(partTask)
+        if (taskMap.containsValue(partitionTaskId)) {
+          task = taskMap.asScala.find(_._2.equals(partitionTaskId)).get._1
+        } else {
+          task = counter.incrementAndGet().toString
+          taskMap.put(task, partitionTaskId)
+        }
+        partitionTaskMap.put(partTask, taskMap)
+      }
+      task
+    } else {
+      split.taskId
+    }
+  }
+
   private def getPartitionNamesFromTask(taskId: String,
-      partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = {
+      partitionTaskMap: util.Map[PartitionSpec, String],
+      partitionTaskMapBasedOnId: util.Map[PartitionSpec, util.HashMap[String, Int]],

Review comment:
       If `partitionTaskMapBasedOnId` is not empty we can go with new flow else old flow, why other two arguments required ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org