You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:21 UTC
[19/45] carbondata git commit: [CARBONDATA-2978] Fixed JVM crash
issue when insert into carbon table from other carbon table
[CARBONDATA-2978] Fixed JVM crash issue when insert into carbon table from other carbon table
Problem:
When data is inserted from one carbon to other carbon table and unsafe load and query is enabled then JVM crash is happening.
Reason: When insert happens from one carbon table another table it uses same task and thread so it
gets the same taskid and at the unsafe manager tries to release all memory acquired by the task even though load happens on the task.
Solution:
Check the listeners and ignore cache clearing.
This closes #2773
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9ae91cc5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9ae91cc5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9ae91cc5
Branch: refs/heads/branch-1.5
Commit: 9ae91cc5a9d683ef54550cfe7e65c4d63d5e5a24
Parents: c016361
Author: ravipesala <ra...@gmail.com>
Authored: Wed Sep 26 23:04:59 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri Sep 28 19:51:06 2018 +0530
----------------------------------------------------------------------
.../hadoop/api/CarbonTableOutputFormat.java | 35 +++++----
.../InsertIntoNonCarbonTableTestCase.scala | 79 +++++++++++++++++++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 76 ++++++++++++-------
.../rdd/InsertTaskCompletionListener.scala | 4 +-
.../spark/rdd/QueryTaskCompletionListener.scala | 4 +-
.../datasources/SparkCarbonFileFormat.scala | 23 +++++-
.../CarbonTaskCompletionListener.scala | 72 ++++++++++++++++++
7 files changed, 246 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 28817e9..762983b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -424,6 +424,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
private Future future;
+ private boolean isClosed;
+
public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
ExecutorService executorService) {
@@ -442,22 +444,25 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
}
@Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
- if (iteratorWrapper != null) {
- iteratorWrapper.closeWriter(false);
- }
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.error("Error while loading data", e);
- throw new InterruptedException(e.getMessage());
- } finally {
- executorService.shutdownNow();
- dataLoadExecutor.close();
- ThreadLocalSessionInfo.unsetAll();
- // clean up the folders and files created locally for data load operation
- TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
+ if (!isClosed) {
+ isClosed = true;
+ if (iteratorWrapper != null) {
+ iteratorWrapper.closeWriter(false);
+ }
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Error while loading data", e);
+ throw new InterruptedException(e.getMessage());
+ } finally {
+ executorService.shutdownNow();
+ dataLoadExecutor.close();
+ ThreadLocalSessionInfo.unsetAll();
+ // clean up the folders and files created locally for data load operation
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
+ }
+ LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
}
- LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
}
public CarbonLoadModel getLoadModel() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
index a745672..a3fb11c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala
@@ -18,10 +18,13 @@
*/
package org.apache.carbondata.spark.testsuite.insertQuery
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
@@ -64,6 +67,8 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
"Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions," +
"Latest_operatorId,gamePointDescription,gamePointId,contractNumber', " +
"'bad_records_logger_enable'='false','bad_records_action'='FORCE')")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "true")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
}
test("insert into hive") {
@@ -102,7 +107,79 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
sql("drop table thive_cond")
}
+ test("jvm crash when insert data from datasource table to session table") {
+ val spark = sqlContext.sparkSession
+ import spark.implicits._
+
+ import scala.util.Random
+ val r = new Random()
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+ .toDF("ID", "name", "city", "age")
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+
+ df.write.format("carbon").saveAsTable("personTable")
+ spark.sql("create table test_table(ID int, name string, city string, age decimal) stored by 'carbondata' tblproperties('sort_columns'='ID')")
+ spark.sql("insert into test_table select * from personTable")
+ spark.sql("insert into test_table select * from personTable limit 2")
+
+ assert(spark.sql("select * from test_table").count() == 12)
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+ }
+
+ test("jvm crash when insert data from datasource table to datasource table") {
+ val spark = sqlContext.sparkSession
+ import spark.implicits._
+
+ import scala.util.Random
+ val r = new Random()
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+ .toDF("ID", "name", "city", "age")
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+
+ df.write.format("carbon").saveAsTable("personTable")
+ spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
+ spark.sql("insert into test_table select * from personTable")
+ spark.sql("insert into test_table select * from personTable limit 2")
+
+ assert(spark.sql("select * from test_table").count() == 12)
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+ }
+
+ test("jvm crash when insert data from session table to datasource table") {
+ val spark = sqlContext.sparkSession
+ import spark.implicits._
+
+ import scala.util.Random
+ val r = new Random()
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
+ .toDF("ID", "name", "city", "age")
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+
+ df.write
+ .format("carbondata")
+ .option("tableName", "personTable")
+ .mode(SaveMode.Overwrite)
+ .save()
+ spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
+ spark.sql("insert into test_table select * from personTable")
+ spark.sql("insert into test_table select * from personTable limit 2")
+
+ assert(spark.sql("select * from test_table").count() == 12)
+ spark.sql("DROP TABLE IF EXISTS personTable")
+ spark.sql("DROP TABLE IF EXISTS test_table")
+ }
+
override def afterAll {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
sql("DROP TABLE IF EXISTS TCarbonSource")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index eb7abbc..1a7eae2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
@@ -470,39 +471,28 @@ class CarbonScanRDD[T: ClassTag](
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
model.setStatisticsRecorder(recorder)
- // TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
- // TODO: no memory leak should be there, resources should be freed on success completion.
- val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
- onCompleteCallbacksField.setAccessible(true)
- val listeners = onCompleteCallbacksField.get(context)
- .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
-
- val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
- model.setFreeUnsafeMemory(!isAdded)
- // add task completion before calling initialize as initialize method will internally call
- // for usage of unsafe method for processing of one blocklet and if there is any exception
- // while doing that the unsafe memory occupied for that task will not get cleared
- context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
- reader,
- inputMetricsStats,
- executionId,
- taskId,
- queryStartTime,
- model.getStatisticsRecorder,
- split,
- queryId)
- }
- // initialize the reader
- reader.initialize(inputSplit, attemptContext)
-
new Iterator[Any] {
private var havePair = false
private var finished = false
+ private var first = true
override def hasNext: Boolean = {
if (context.isInterrupted) {
throw new TaskKilledException
}
+ if (first) {
+ first = false
+ addTaskCompletionListener(
+ split,
+ context,
+ queryStartTime,
+ executionId,
+ taskId,
+ model,
+ reader)
+ // initialize the reader
+ reader.initialize(inputSplit, attemptContext)
+ }
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
@@ -534,6 +524,42 @@ class CarbonScanRDD[T: ClassTag](
iterator.asInstanceOf[Iterator[T]]
}
+ private def addTaskCompletionListener(split: Partition,
+ context: TaskContext,
+ queryStartTime: Long,
+ executionId: String,
+ taskId: Int,
+ model: QueryModel,
+ reader: RecordReader[Void, Object]) = {
+ // TODO: rewrite this logic to call free memory in FailureListener on failures and
+ // On success,
+ // TODO: no memory leak should be there, resources should be freed on
+ // success completion.
+ val onCompleteCallbacksField =
+ context.getClass.getDeclaredField("onCompleteCallbacks")
+ onCompleteCallbacksField.setAccessible(true)
+ val listeners = onCompleteCallbacksField.get(context)
+ .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+
+ val isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
+ model.setFreeUnsafeMemory(!isAdded)
+ // add task completion before calling initialize as initialize method will internally
+ // call for usage of unsafe method for processing of one blocklet and if there is any
+ // exceptionwhile doing that the unsafe memory occupied for that task will not
+ // get cleared
+ context.addTaskCompletionListener {
+ new QueryTaskCompletionListener(!isAdded,
+ reader,
+ inputMetricsStats,
+ executionId,
+ taskId,
+ queryStartTime,
+ model.getStatisticsRecorder,
+ split,
+ queryId)
+ }
+ }
+
private def close() {
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
inputMetricsStats.updateAndClose()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index dfdbd19..7246645 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -18,8 +18,8 @@
package org.apache.carbondata.spark.rdd
import org.apache.spark.TaskContext
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.util.TaskCompletionListener
import org.apache.carbondata.core.util.ThreadLocalTaskInfo
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
@@ -27,7 +27,7 @@ import org.apache.carbondata.spark.util.CommonUtil
class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
executorErrors: ExecutionErrors)
- extends TaskCompletionListener {
+ extends CarbonLoadTaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
try {
dataLoadExecutor.close()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
index e4cb3f8..97449c5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
@@ -21,8 +21,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonQueryTaskCompletionListener
import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd}
-import org.apache.spark.util.TaskCompletionListener
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.memory.UnsafeMemoryManager
@@ -34,7 +34,7 @@ class QueryTaskCompletionListener(freeMemory: Boolean,
var reader: RecordReader[Void, Object],
inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime: Long,
queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String)
- extends TaskCompletionListener {
+ extends CarbonQueryTaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
if (reader != null) {
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a6965ac..53b1bb1 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.carbondata.execution.datasources
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -29,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener, CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener, CarbonQueryTaskCompletionListenerImpl}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -37,7 +39,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkTypeConverter
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener}
import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -174,6 +176,10 @@ class SparkCarbonFileFormat extends FileFormat
private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
new CarbonTableOutputFormat().getRecordWriter(context)
+ Option(TaskContext.get()).foreach {c =>
+ c.addTaskCompletionListener(CarbonLoadTaskCompletionListenerImpl(recordWriter, context))
+ }
+
/**
* Write sparks internal row to carbondata record writer
*/
@@ -388,6 +394,15 @@ class SparkCarbonFileFormat extends FileFormat
val model = format.createQueryModel(split, hadoopAttemptContext)
model.setConverter(new SparkDataTypeConverterImpl)
model.setPreFetchData(false)
+ var isAdded = false
+ Option(TaskContext.get()).foreach { context =>
+ val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
+ onCompleteCallbacksField.setAccessible(true)
+ val listeners = onCompleteCallbacksField.get(context)
+ .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+ isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
+ model.setFreeUnsafeMemory(!isAdded)
+ }
val carbonReader = if (readVector) {
val vectorizedReader = new VectorizedCarbonRecordReader(model,
null,
@@ -404,7 +419,11 @@ class SparkCarbonFileFormat extends FileFormat
}
val iter = new RecordReaderIterator(carbonReader)
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+ Option(TaskContext.get()).foreach{context =>
+ context.addTaskCompletionListener(
+ CarbonQueryTaskCompletionListenerImpl(
+ iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded))
+ }
if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
iter.asInstanceOf[Iterator[InternalRow]]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
new file mode 100644
index 0000000..9d889d4
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.tasklisteners
+
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.RecordReaderIterator
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.memory.UnsafeMemoryManager
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+
+/**
+ * Query completion listener
+ */
+trait CarbonQueryTaskCompletionListener extends TaskCompletionListener
+
+/**
+ * Load completion listener
+ */
+trait CarbonLoadTaskCompletionListener extends TaskCompletionListener
+
+case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow],
+ freeMemory: Boolean) extends CarbonQueryTaskCompletionListener {
+ override def onTaskCompletion(context: TaskContext): Unit = {
+ if (iter != null) {
+ try {
+ iter.close()
+ } catch {
+ case e: Exception =>
+ LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
+ }
+ }
+ if (freeMemory) {
+ UnsafeMemoryManager.INSTANCE
+ .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+ }
+ }
+}
+
+case class CarbonLoadTaskCompletionListenerImpl(recordWriter: RecordWriter[NullWritable,
+ ObjectArrayWritable],
+ taskAttemptContext: TaskAttemptContext) extends CarbonLoadTaskCompletionListener {
+
+ override def onTaskCompletion(context: TaskContext): Unit = {
+ try {
+ recordWriter.close(taskAttemptContext)
+ } finally {
+ UnsafeMemoryManager.INSTANCE
+ .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+ }
+ }
+}