You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:15 UTC
[carbondata] 11/27: [CARBONDATA-3265] Fixed memory leak in Range
Sort
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 7383da8743d1b86b36853f5d314e048b836a2a40
Author: shivamasn <sh...@gmail.com>
AuthorDate: Tue Jan 22 15:28:10 2019 +0530
[CARBONDATA-3265] Fixed memory leak in Range Sort
In range sort, unsafe memory was not getting cleared in case of task failure.
So, added a fix for memory leak.
This closes #3095
---
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 29 ++++++++++++++++------
1 file changed, 22 insertions(+), 7 deletions(-)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index a5d354a..77d0d84 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark.load
import java.util.Comparator
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, RangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -29,18 +29,19 @@ import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses}
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
/**
* Use sortBy operator in spark to load the data
@@ -127,9 +128,11 @@ object DataLoadProcessBuilderOnSpark {
}
// 4. Write
- sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+ sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+ setTaskListener()
DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
- writeStepRowCounter, conf.value.value))
+ writeStepRowCounter, conf.value.value)
+ })
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
// not have any functional impact as spark automatically monitors the cache usage on each node
@@ -221,9 +224,11 @@ object DataLoadProcessBuilderOnSpark {
.map(_._2)
// 4. Sort and Write data
- sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
+ sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
+ setTaskListener()
DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
- writeStepRowCounter, conf.value.value))
+ writeStepRowCounter, conf.value.value)
+ })
// Log the number of rows in each step
LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
@@ -340,6 +345,16 @@ object DataLoadProcessBuilderOnSpark {
new PrimtiveOrdering(column.getDataType)
}
}
+
+ def setTaskListener(): Unit = {
+ TaskContext.get.addTaskCompletionListener { _ =>
+ CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+ }
+ TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+ val carbonTaskInfo = new CarbonTaskInfo
+ carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
+ ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
+ }
}
class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {