You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/05 12:40:16 UTC

[8/8] carbondata git commit: [HOTFIX]Fixed test case failure due to bad record handling

[HOTFIX]Fixed test case failure due to bad record handling

This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b611a861
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b611a861
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b611a861

Branch: refs/heads/master
Commit: b611a861a9b0f997a58eb3eee3c9606576352217
Parents: 74c3eb1
Author: sandeep-katta <sa...@gmail.com>
Authored: Tue Sep 4 20:00:19 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Sep 5 18:09:45 2018 +0530

----------------------------------------------------------------------
 .../rdd/InsertTaskCompletionListener.scala      | 20 ++++++++++++++++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  7 ++++---
 2 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b611a861/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index 9439ae5..dfdbd19 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -18,16 +18,28 @@
 package org.apache.carbondata.spark.rdd
 
 import org.apache.spark.TaskContext
+import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.util.TaskCompletionListener
 
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo
-import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
 import org.apache.carbondata.spark.util.CommonUtil
 
-class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor)
+class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
+    executorErrors: ExecutionErrors)
   extends TaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
-    dataLoadExecutor.close()
-    CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    try {
+      dataLoadExecutor.close()
+    }
+    catch {
+      case e: Exception =>
+        if (executorErrors.failureCauses != FailureCauses.BAD_RECORDS) {
+          throw e
+        }
+    }
+    finally {
+      CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b611a861/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 38d6b48..87c8e4c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -207,8 +207,8 @@ class NewCarbonDataLoadRDD[K, V](
         loader.initialize()
         val executor = new DataLoadExecutor()
         // in case of success, failure or cancelation clear memory and stop execution
-        context.addTaskCompletionListener { context => executor.close()
-          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        context
+          .addTaskCompletionListener { new InsertTaskCompletionListener(executor, executionErrors) }
         executor.execute(model,
           loader.storeLocation,
           recordReaders)
@@ -347,7 +347,8 @@ class NewDataFrameLoaderRDD[K, V](
         loader.initialize()
         val executor = new DataLoadExecutor
         // in case of success, failure or cancelation clear memory and stop execution
-        context.addTaskCompletionListener (new InsertTaskCompletionListener(executor))
+        context
+          .addTaskCompletionListener(new InsertTaskCompletionListener(executor, executionErrors))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>