You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/11/09 08:31:16 UTC

[carbondata] branch master updated: [HOTFIX] Avoid calling SecondaryIndexUtil.readFileFooter() for every splits identified during SI creation

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc9533  [HOTFIX] Avoid calling SecondaryIndexUtil.readFileFooter() for every splits identified during SI creation
bfc9533 is described below

commit bfc9533ea479e9051a3ce67a58dae63c9af14b42
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri Nov 6 14:31:34 2020 +0530

    [HOTFIX] Avoid calling SecondaryIndexUtil.readFileFooter() for every splits identified during SI creation
    
    Why is this PR needed?
    1. Redundant functionality SecondaryIndexUtil.readFileFooter() exist for SI creation
    2. Some info logs can be changed to debug to avoid looping.
    
    What changes were proposed in this PR?
    1. remove SecondaryIndexUtil.readFileFooter() for SI creation
    2. Some info logs to debug to avoid looping.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4006
---
 .../rdd/CarbonSecondaryIndexRDD.scala              | 48 ++++++++++------------
 1 file changed, 22 insertions(+), 26 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
index 9196c9e..2b31995 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
@@ -185,6 +185,7 @@ class CarbonSecondaryIndexRDD[K, V](
   }
 
   override def internalGetPartitions: Array[Partition] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       carbonStoreLocation, databaseName, factTableName, tableId)
@@ -255,7 +256,7 @@ class CarbonSecondaryIndexRDD[K, V](
 
       val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
       val nodes = DistributionUtil.getNodeList(sparkContext)
-      logInfo("no.of.nodes where data present=" + nodeBlockMap.size())
+      LOGGER.info("no.of.nodes where data present=" + nodeBlockMap.size())
       defaultParallelism = sparkContext.defaultParallelism
 
       // Create Spark Partition for each task and assign blocks
@@ -278,34 +279,29 @@ class CarbonSecondaryIndexRDD[K, V](
           }
         }
       }
-
-      // print the node info along with task and number of blocks for the task.
-      nodeTaskBlocksMap.asScala.foreach((entry: (String, util.List[NodeInfo])) => {
-        logInfo(s"for the node ${ entry._1 }")
-        for (elem <- entry._2.asScala) {
-          logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
-        }
-      })
-
+      if (LOGGER.isDebugEnabled) {
+        // print the node info along with task and number of blocks for the task.
+        nodeTaskBlocksMap.asScala.foreach((entry: (String, util.List[NodeInfo])) => {
+          LOGGER.debug(s"for the node ${ entry._1 }")
+          for (elem <- entry._2.asScala) {
+            LOGGER.debug("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
+          }
+        })
+      }
       val noOfNodes = nodes.length
       val noOfTasks = result.size
-      logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
-              s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: " +
-              s"$noOfTasks")
-      logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
-      for (j <- 0 until result.size) {
-        val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
-        val splitList = multiBlockSplit.getAllSplits
-        val tableBlocks: util.List[TableBlockInfo] = CarbonInputSplit.createBlocks(splitList)
-        val tableBlocksSize: Int = tableBlocks.size
-        if (tableBlocksSize > 0) {
-          // read the footer and get column cardinality which will be same for all tasks in a
-          // segment
-          val dataFileFooter: DataFileFooter = SecondaryIndexUtil
-            .readFileFooter(tableBlocks.get(tableBlocks.size() - 1))
+      LOGGER.info(s"Identified  no.of.Blocks: $noOfBlocks," +
+                  s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: " +
+                  s"$noOfTasks")
+      LOGGER.info(
+        "Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
+      if (LOGGER.isDebugEnabled) {
+        for (j <- 0 until result.size) {
+          val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+          val splitList = multiBlockSplit.getAllSplits
+          LOGGER.debug(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
+                       s"${ CarbonInputSplit.createBlocks(splitList).size }")
         }
-        logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
-                s"${ CarbonInputSplit.createBlocks(splitList).size }")
       }
       result.toArray(new Array[Partition](result.size))
     } else {