You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:21 UTC

[19/45] carbondata git commit: [CARBONDATA-2978] Fixed JVM crash issue when insert into carbon table from other carbon table

[CARBONDATA-2978] Fixed JVM crash issue when insert into carbon table from other carbon table

Problem:
When data is inserted from one carbon to other carbon table and unsafe load and query is enabled then JVM crash is happening.
Reason: When insert happens from one carbon table another table it uses same task and thread so it
gets the same taskid and at the unsafe manager tries to release all memory acquired by the task even though load happens on the task.

Solution:
Check the listeners and ignore cache clearing.

This closes #2773


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9ae91cc5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9ae91cc5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9ae91cc5

Branch: refs/heads/branch-1.5
Commit: 9ae91cc5a9d683ef54550cfe7e65c4d63d5e5a24
Parents: c016361
Author: ravipesala <ra...@gmail.com>
Authored: Wed Sep 26 23:04:59 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Sep 28 19:51:06 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableOutputFormat.java     | 35 +++++----
 .../InsertIntoNonCarbonTableTestCase.scala      | 79 +++++++++++++++++++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 76 ++++++++++++-------
 .../rdd/InsertTaskCompletionListener.scala      |  4 +-
 .../spark/rdd/QueryTaskCompletionListener.scala |  4 +-
 .../datasources/SparkCarbonFileFormat.scala     | 23 +++++-
 .../CarbonTaskCompletionListener.scala          | 72 ++++++++++++++++++
 7 files changed, 246 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 28817e9..762983b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -424,6 +424,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
 
     private Future future;
 
+    private boolean isClosed;
+
     public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
         DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
         ExecutorService executorService) {
@@ -442,22 +444,25 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
-      if (iteratorWrapper != null) {
-        iteratorWrapper.closeWriter(false);
-      }
-      try {
-        future.get();
-      } catch (ExecutionException e) {
-        LOG.error("Error while loading data", e);
-        throw new InterruptedException(e.getMessage());
-      } finally {
-        executorService.shutdownNow();
-        dataLoadExecutor.close();
-        ThreadLocalSessionInfo.unsetAll();
-        // clean up the folders and files created locally for data load operation
-        TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
+      if (!isClosed) {
+        isClosed = true;
+        if (iteratorWrapper != null) {
+          iteratorWrapper.closeWriter(false);
+        }
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          LOG.error("Error while loading data", e);
+          throw new InterruptedException(e.getMessage());
+        } finally {
+          executorService.shutdownNow();
+          dataLoadExecutor.close();
+          ThreadLocalSessionInfo.unsetAll();
+          // clean up the folders and files created locally for data load operation
+          TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
+        }
+        LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
       }
-      LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
     }
 
     public CarbonLoadModel getLoadModel() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
index a745672..a3fb11c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
@@ -18,10 +18,13 @@
  */
 package org.apache.carbondata.spark.testsuite.insertQuery
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 
 class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
@@ -64,6 +67,8 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
       "Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions," +
       "Latest_operatorId,gamePointDescription,gamePointId,contractNumber', " +
       "'bad_records_logger_enable'='false','bad_records_action'='FORCE')")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
   }
 
   test("insert into hive") {
@@ -102,7 +107,79 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
     sql("drop table thive_cond")
   }
 
+  test("jvm crash when insert data from datasource table to session table") {
+    val spark = sqlContext.sparkSession
+    import spark.implicits._
+
+    import scala.util.Random
+    val r = new Random()
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+      .toDF("ID", "name", "city", "age")
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+
+    df.write.format("carbon").saveAsTable("personTable")
+    spark.sql("create table test_table(ID int, name string, city string, age decimal) stored by 'carbondata' tblproperties('sort_columns'='ID')")
+    spark.sql("insert into test_table select * from personTable")
+    spark.sql("insert into test_table select * from personTable limit 2")
+
+    assert(spark.sql("select * from test_table").count() == 12)
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+  }
+
+  test("jvm crash when insert data from datasource table to datasource table") {
+    val spark = sqlContext.sparkSession
+    import spark.implicits._
+
+    import scala.util.Random
+    val r = new Random()
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+      .toDF("ID", "name", "city", "age")
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+
+    df.write.format("carbon").saveAsTable("personTable")
+    spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
+    spark.sql("insert into test_table select * from personTable")
+    spark.sql("insert into test_table select * from personTable limit 2")
+
+    assert(spark.sql("select * from test_table").count() == 12)
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+  }
+
+  test("jvm crash when insert data from session table to datasource table") {
+    val spark = sqlContext.sparkSession
+    import spark.implicits._
+
+    import scala.util.Random
+    val r = new Random()
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+      .toDF("ID", "name", "city", "age")
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+
+    df.write
+      .format("carbondata")
+      .option("tableName", "personTable")
+      .mode(SaveMode.Overwrite)
+      .save()
+    spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
+    spark.sql("insert into test_table select * from personTable")
+    spark.sql("insert into test_table select * from personTable limit 2")
+
+    assert(spark.sql("select * from test_table").count() == 12)
+    spark.sql("DROP TABLE IF EXISTS personTable")
+    spark.sql("DROP TABLE IF EXISTS test_table")
+  }
+
   override def afterAll {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
     sql("DROP TABLE IF EXISTS TCarbonSource")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index eb7abbc..1a7eae2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -35,6 +35,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
@@ -470,39 +471,28 @@ class CarbonScanRDD[T: ClassTag](
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
       model.setStatisticsRecorder(recorder)
 
-      // TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
-      // TODO: no memory leak should be there, resources should be freed on success completion.
-      val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
-      onCompleteCallbacksField.setAccessible(true)
-      val listeners = onCompleteCallbacksField.get(context)
-        .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
-
-      val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
-      model.setFreeUnsafeMemory(!isAdded)
-      // add task completion before calling initialize as initialize method will internally call
-      // for usage of unsafe method for processing of one blocklet and if there is any exception
-      // while doing that the unsafe memory occupied for that task will not get cleared
-      context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
-        reader,
-        inputMetricsStats,
-        executionId,
-        taskId,
-        queryStartTime,
-        model.getStatisticsRecorder,
-        split,
-        queryId)
-      }
-      // initialize the reader
-      reader.initialize(inputSplit, attemptContext)
-
       new Iterator[Any] {
         private var havePair = false
         private var finished = false
+        private var first = true
 
         override def hasNext: Boolean = {
           if (context.isInterrupted) {
             throw new TaskKilledException
           }
+          if (first) {
+            first = false
+            addTaskCompletionListener(
+              split,
+              context,
+              queryStartTime,
+              executionId,
+              taskId,
+              model,
+              reader)
+            // initialize the reader
+            reader.initialize(inputSplit, attemptContext)
+          }
           if (!finished && !havePair) {
             finished = !reader.nextKeyValue
             havePair = !finished
@@ -534,6 +524,42 @@ class CarbonScanRDD[T: ClassTag](
     iterator.asInstanceOf[Iterator[T]]
   }
 
+  private def addTaskCompletionListener(split: Partition,
+      context: TaskContext,
+      queryStartTime: Long,
+      executionId: String,
+      taskId: Int,
+      model: QueryModel,
+      reader: RecordReader[Void, Object]) = {
+    // TODO: rewrite this logic to call free memory in FailureListener on failures and
+    // On success,
+    // TODO: no memory leak should be there, resources should be freed on
+    // success completion.
+    val onCompleteCallbacksField =
+    context.getClass.getDeclaredField("onCompleteCallbacks")
+    onCompleteCallbacksField.setAccessible(true)
+    val listeners = onCompleteCallbacksField.get(context)
+      .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+
+    val isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
+    model.setFreeUnsafeMemory(!isAdded)
+    // add task completion before calling initialize as initialize method will internally
+    // call for usage of unsafe method for processing of one blocklet and if there is any
+    // exceptionwhile doing that the unsafe memory occupied for that task will not
+    // get cleared
+    context.addTaskCompletionListener {
+      new QueryTaskCompletionListener(!isAdded,
+        reader,
+        inputMetricsStats,
+        executionId,
+        taskId,
+        queryStartTime,
+        model.getStatisticsRecorder,
+        split,
+        queryId)
+    }
+  }
+
   private def close() {
     TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
     inputMetricsStats.updateAndClose()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index dfdbd19..7246645 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -18,8 +18,8 @@
 package org.apache.carbondata.spark.rdd
 
 import org.apache.spark.TaskContext
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.util.TaskCompletionListener
 
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
@@ -27,7 +27,7 @@ import org.apache.carbondata.spark.util.CommonUtil
 
 class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
     executorErrors: ExecutionErrors)
-  extends TaskCompletionListener {
+  extends CarbonLoadTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
     try {
       dataLoadExecutor.close()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
index e4cb3f8..97449c5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
@@ -21,8 +21,8 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonQueryTaskCompletionListener
 import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd}
-import org.apache.spark.util.TaskCompletionListener
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.memory.UnsafeMemoryManager
@@ -34,7 +34,7 @@ class QueryTaskCompletionListener(freeMemory: Boolean,
     var reader: RecordReader[Void, Object],
     inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime: Long,
     queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String)
-  extends TaskCompletionListener {
+  extends CarbonQueryTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
     if (reader != null) {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a6965ac..53b1bb1 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.carbondata.execution.datasources
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -29,6 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql._
 import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener, CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener, CarbonQueryTaskCompletionListenerImpl}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -37,7 +39,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkTypeConverter
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener}
 
 import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -174,6 +176,10 @@ class SparkCarbonFileFormat extends FileFormat
     private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
       new CarbonTableOutputFormat().getRecordWriter(context)
 
+    Option(TaskContext.get()).foreach {c =>
+      c.addTaskCompletionListener(CarbonLoadTaskCompletionListenerImpl(recordWriter, context))
+    }
+
     /**
      * Write sparks internal row to carbondata record writer
      */
@@ -388,6 +394,15 @@ class SparkCarbonFileFormat extends FileFormat
         val model = format.createQueryModel(split, hadoopAttemptContext)
         model.setConverter(new SparkDataTypeConverterImpl)
         model.setPreFetchData(false)
+        var isAdded = false
+        Option(TaskContext.get()).foreach { context =>
+          val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
+          onCompleteCallbacksField.setAccessible(true)
+          val listeners = onCompleteCallbacksField.get(context)
+            .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+          isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
+          model.setFreeUnsafeMemory(!isAdded)
+        }
         val carbonReader = if (readVector) {
           val vectorizedReader = new VectorizedCarbonRecordReader(model,
             null,
@@ -404,7 +419,11 @@ class SparkCarbonFileFormat extends FileFormat
         }
 
         val iter = new RecordReaderIterator(carbonReader)
-        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+        Option(TaskContext.get()).foreach{context =>
+          context.addTaskCompletionListener(
+          CarbonQueryTaskCompletionListenerImpl(
+            iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded))
+        }
 
         if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
           iter.asInstanceOf[Iterator[InternalRow]]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
new file mode 100644
index 0000000..9d889d4
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.tasklisteners
+
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.memory.UnsafeMemoryManager
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+
+/**
+ * Query completion listener
+ */
+trait CarbonQueryTaskCompletionListener extends TaskCompletionListener
+
+/**
+ * Load completion listener
+ */
+trait CarbonLoadTaskCompletionListener extends TaskCompletionListener
+
+case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow],
+    freeMemory: Boolean) extends CarbonQueryTaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    if (iter != null) {
+      try {
+        iter.close()
+      } catch {
+        case e: Exception =>
+          LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
+      }
+    }
+    if (freeMemory) {
+      UnsafeMemoryManager.INSTANCE
+        .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+  }
+}
+
+case class CarbonLoadTaskCompletionListenerImpl(recordWriter: RecordWriter[NullWritable,
+  ObjectArrayWritable],
+    taskAttemptContext: TaskAttemptContext) extends CarbonLoadTaskCompletionListener {
+
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    try {
+      recordWriter.close(taskAttemptContext)
+    } finally {
+      UnsafeMemoryManager.INSTANCE
+        .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+  }
+}