You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:15 UTC

[carbondata] 11/27: [CARBONDATA-3265] Fixed memory leak in Range Sort

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 7383da8743d1b86b36853f5d314e048b836a2a40
Author: shivamasn <sh...@gmail.com>
AuthorDate: Tue Jan 22 15:28:10 2019 +0530

    [CARBONDATA-3265] Fixed memory leak in Range Sort
    
    In range sort, unsafe memory was not getting cleared in case of task failure.
    So, added a fix for memory leak.
    
    This closes #3095
---
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index a5d354a..77d0d84 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark.load
 import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, RangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -29,18 +29,19 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -127,9 +128,11 @@ object DataLoadProcessBuilderOnSpark {
       }
 
     // 4. Write
-    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node
@@ -221,9 +224,11 @@ object DataLoadProcessBuilderOnSpark {
       .map(_._2)
 
     // 4. Sort and Write data
-    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+      setTaskListener()
       DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf.value.value))
+        writeStepRowCounter, conf.value.value)
+    })
 
     // Log the number of rows in each step
     LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
@@ -340,6 +345,16 @@ object DataLoadProcessBuilderOnSpark {
       new PrimtiveOrdering(column.getDataType)
     }
   }
+
+  def setTaskListener(): Unit = {
+    TaskContext.get.addTaskCompletionListener { _ =>
+      CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    val carbonTaskInfo = new CarbonTaskInfo
+    carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
+  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {