You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:16 UTC
[8/8] carbondata git commit: [HOTFIX]Fixed test case failure due to
bad record handling
[HOTFIX]Fixed test case failure due to bad record handling
This closes #2642
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b611a861
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b611a861
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b611a861
Branch: refs/heads/master
Commit: b611a861a9b0f997a58eb3eee3c9606576352217
Parents: 74c3eb1
Author: sandeep-katta <sa...@gmail.com>
Authored: Tue Sep 4 20:00:19 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Sep 5 18:09:45 2018 +0530
----------------------------------------------------------------------
.../rdd/InsertTaskCompletionListener.scala | 20 ++++++++++++++++----
.../spark/rdd/NewCarbonDataLoadRDD.scala | 7 ++++---
2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b611a861/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index 9439ae5..dfdbd19 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -18,16 +18,28 @@
package org.apache.carbondata.spark.rdd
import org.apache.spark.TaskContext
+import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.util.TaskCompletionListener
import org.apache.carbondata.core.util.ThreadLocalTaskInfo
-import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
import org.apache.carbondata.spark.util.CommonUtil
-class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor)
+class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
+ executorErrors: ExecutionErrors)
extends TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
- dataLoadExecutor.close()
- CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+ try {
+ dataLoadExecutor.close()
+ }
+ catch {
+ case e: Exception =>
+ if (executorErrors.failureCauses != FailureCauses.BAD_RECORDS) {
+ throw e
+ }
+ }
+ finally {
+ CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b611a861/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 38d6b48..87c8e4c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -207,8 +207,8 @@ class NewCarbonDataLoadRDD[K, V](
loader.initialize()
val executor = new DataLoadExecutor()
// in case of success, failure or cancelation clear memory and stop execution
- context.addTaskCompletionListener { context => executor.close()
- CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+ context
+ .addTaskCompletionListener { new InsertTaskCompletionListener(executor, executionErrors) }
executor.execute(model,
loader.storeLocation,
recordReaders)
@@ -347,7 +347,8 @@ class NewDataFrameLoaderRDD[K, V](
loader.initialize()
val executor = new DataLoadExecutor
// in case of success, failure or cancelation clear memory and stop execution
- context.addTaskCompletionListener (new InsertTaskCompletionListener(executor))
+ context
+ .addTaskCompletionListener(new InsertTaskCompletionListener(executor, executionErrors))
executor.execute(model, loader.storeLocation, recordReaders.toArray)
} catch {
case e: NoRetryException =>