You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/06 14:41:46 UTC

[21/50] [abbrv] carbondata git commit: Problem: Executor lost failure in case of data load failure due to bad records

Problem: Executor lost failure in case of data load failure due to bad records

Analysis: In case when we try to do data load with bad records continuously, after some time it is observed that executor is lost due to OOM error and application also gets restarted by yarn after some time. This happens because in case of data load failure due to bad records exception is thrown by the executor and task keeps retrying till the max number of retry attempts are reached. This keeps happening continuously and after some time application is restarted by yarn.

Fix: When it is known that data load failure is due to bad records and it is an intentional failure from the carbon, then in that case executor should not retry for data load and complete the job gracefully and the failure information should be handled by the driver.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/105b7c34
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/105b7c34
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/105b7c34

Branch: refs/heads/branch-1.1
Commit: 105b7c3496db620390b804b87ac5eb5835b04176
Parents: 357ab63
Author: manishgupta88 <to...@gmail.com>
Authored: Tue Jun 6 12:18:35 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jun 15 13:25:30 2017 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/spark/KeyVal.scala    |  8 ++-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 13 +++-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  5 --
 .../spark/rdd/CarbonDataRDDFactory.scala        | 54 ++++++++++++-----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 63 ++++++++++++++------
 .../converter/impl/RowConverterImpl.java        |  3 +-
 .../exception/BadRecordFoundException.java      |  2 +-
 .../newflow/sort/impl/ThreadStatusObserver.java | 19 +++++-
 8 files changed, 120 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index ab5fc0b..31dd4e6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -48,11 +48,13 @@ class RawValueImpl extends RawValue[Array[Any]] {
 }
 
 trait DataLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: LoadMetadataDetails): (K, V)
+  def getKey(key: String, value: (LoadMetadataDetails, ExecutionErrors)): (K, V)
 }
 
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
-  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
+class DataLoadResultImpl extends DataLoadResult[String, (LoadMetadataDetails, ExecutionErrors)] {
+  override def getKey(key: String,
+      value: (LoadMetadataDetails, ExecutionErrors)): (String, (LoadMetadataDetails,
+    ExecutionErrors)) = {
     (key, value)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index a6d231d..6b30ed7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -34,6 +34,7 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.CarbonIterator
@@ -49,7 +50,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
 import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.load.{CarbonLoaderUtil, FailureCauses}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
 
@@ -219,6 +220,7 @@ class NewCarbonDataLoadRDD[K, V](
     val iter = new Iterator[(K, V)] {
       var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       var model: CarbonLoadModel = _
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
@@ -244,6 +246,8 @@ class NewCarbonDataLoadRDD[K, V](
       } catch {
         case e: BadRecordFoundException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+          executionErrors.errorMsg = e.getMessage
           logInfo("Bad Record Found")
         case e: Exception =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -348,7 +352,7 @@ class NewCarbonDataLoadRDD[K, V](
 
       override def next(): (K, V) = {
         finished = true
-        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+        result.getKey(uniqueLoadStatusId, (loadMetadataDetails, executionErrors))
       }
     }
     iter
@@ -394,6 +398,7 @@ class NewDataFrameLoaderRDD[K, V](
     val iter = new Iterator[(K, V)] {
       val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
+      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
@@ -430,6 +435,8 @@ class NewDataFrameLoaderRDD[K, V](
       } catch {
         case e: BadRecordFoundException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+          executionErrors.errorMsg = e.getMessage
           logInfo("Bad Record Found")
         case e: Exception =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -453,7 +460,7 @@ class NewDataFrameLoaderRDD[K, V](
 
       override def next(): (K, V) = {
         finished = true
-        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+        result.getKey(uniqueLoadStatusId, (loadMetadataDetails, executionErrors))
       }
     }
     iter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index a36fb63..bcfc096 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 
 /**
  * Data load in case of update command .
@@ -61,10 +60,6 @@ object UpdateDataLoad {
         recordReaders.toArray)
 
     } catch {
-      case e: BadRecordFoundException =>
-        loadMetadataDetails.setLoadStatus(
-          CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-        LOGGER.info("Bad Record Found")
       case e: Exception =>
         LOGGER.error(e)
         throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2922365..3d2e35b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -49,12 +49,14 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.processing.constants.LoggerAction
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
 import org.apache.carbondata.spark._
-import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.load.{FailureCauses, _}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
 
@@ -487,7 +489,7 @@ object CarbonDataRDDFactory {
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
       val isTableSplitPartition = false
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
-      var status: Array[(String, LoadMetadataDetails)] = null
+      var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
       var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
       def loadDataFile(): Unit = {
@@ -688,6 +690,12 @@ object CarbonDataRDDFactory {
                 carbonLoadModel,
                 loadMetadataDetails)
             } catch {
+              case e: BadRecordFoundException =>
+                loadMetadataDetails
+                  .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+                executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+                executionErrors.errorMsg = e.getMessage
+                LOGGER.info("Bad Record Found")
               case e: Exception =>
                 LOGGER.info("DataLoad failure")
                 LOGGER.error(e)
@@ -744,8 +752,7 @@ object CarbonDataRDDFactory {
           loadDataFrameForUpdate()
         } else if (dataFrame.isDefined) {
           loadDataFrame()
-        }
-        else {
+        } else {
           loadDataFile()
         }
         if (updateModel.isDefined) {
@@ -762,25 +769,30 @@ object CarbonDataRDDFactory {
                 else {
                   updateModel.get.executorErrors = resultOfBlock._2._2
                 }
+              } else if (resultOfBlock._2._1.getLoadStatus
+                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
               }
             }
           ))
 
         }
         else {
-        val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+          val newStatusMap = scala.collection.mutable.Map.empty[String, String]
         if (status.nonEmpty) {
           status.foreach { eachLoadStatus =>
             val state = newStatusMap.get(eachLoadStatus._1)
             state match {
               case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
               case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                if eachLoadStatus._2.getLoadStatus ==
+                if eachLoadStatus._2._1.getLoadStatus ==
                     CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
               case _ =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
             }
           }
 
@@ -833,8 +845,11 @@ object CarbonDataRDDFactory {
             }
           }
           return
-        }
-        else {
+        } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
+                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+          return
+        } else {
           // in success case handle updation of the table status file.
           // success case.
           val segmentDetails = new util.HashSet[String]()
@@ -883,7 +898,7 @@ object CarbonDataRDDFactory {
 
         return
       }
-        LOGGER.info("********starting clean up**********")
+      LOGGER.info("********starting clean up**********")
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
@@ -892,7 +907,18 @@ object CarbonDataRDDFactory {
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
-        val metadataDetails = status(0)._2
+        // check if data load fails due to bad record and throw data load failure due to
+        // bad record exception
+        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
+            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+          LOGGER.info("********clean up done**********")
+          LOGGER.audit(s"Data load is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          throw new Exception(status(0)._2._2.errorMsg)
+        }
+        val metadataDetails = status(0)._2._1
         if (!isAgg) {
             writeDictionary(carbonLoadModel, result, false)
             val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b4720a9..cab78fe 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -48,13 +48,14 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException}
 import org.apache.carbondata.spark._
-import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.load.{FailureCauses, _}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
 
@@ -501,7 +502,7 @@ object CarbonDataRDDFactory {
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
       val isTableSplitPartition = false
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
-      var status: Array[(String, LoadMetadataDetails)] = null
+      var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
       var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
       def loadDataFile(): Unit = {
@@ -701,6 +702,12 @@ object CarbonDataRDDFactory {
                 carbonLoadModel,
                 loadMetadataDetails)
             } catch {
+              case e: BadRecordFoundException =>
+                loadMetadataDetails
+                  .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+                executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+                executionErrors.errorMsg = e.getMessage
+                LOGGER.info("Bad Record Found")
               case e: Exception =>
                 LOGGER.info("DataLoad failure")
                 LOGGER.error(e)
@@ -791,6 +798,11 @@ object CarbonDataRDDFactory {
                 else {
                   updateModel.get.executorErrors = resultOfBlock._2._2
                 }
+              } else if (resultOfBlock._2._1.getLoadStatus
+                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
               }
             }
           ))
@@ -798,20 +810,20 @@ object CarbonDataRDDFactory {
         }
         else {
         val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-        if (status.nonEmpty) {
-          status.foreach { eachLoadStatus =>
-            val state = newStatusMap.get(eachLoadStatus._1)
-            state match {
-              case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-              case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                if eachLoadStatus._2.getLoadStatus ==
-                    CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-              case _ =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+          if (status.nonEmpty) {
+            status.foreach { eachLoadStatus =>
+              val state = newStatusMap.get(eachLoadStatus._1)
+              state match {
+                case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
+                case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+                  if eachLoadStatus._2._1.getLoadStatus ==
+                     CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
+                case _ =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
+              }
             }
-          }
 
           newStatusMap.foreach {
             case (key, value) =>
@@ -864,6 +876,10 @@ object CarbonDataRDDFactory {
             }
           }
           return
+        } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
+                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+          return
         } else {
           // in success case handle updation of the table status file.
           // success case.
@@ -913,7 +929,7 @@ object CarbonDataRDDFactory {
 
         return
       }
-        LOGGER.info("********starting clean up**********")
+      LOGGER.info("********starting clean up**********")
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
@@ -922,6 +938,17 @@ object CarbonDataRDDFactory {
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
+        // check if data load fails due to bad record and throw data load failure due to
+        // bad record exception
+        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
+            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+          LOGGER.info("********clean up done**********")
+          LOGGER.audit(s"Data load is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          throw new Exception(status(0)._2._2.errorMsg)
+        }
         // if segment is empty then fail the data load
         if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, currentLoadCount)) {
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
@@ -932,7 +959,7 @@ object CarbonDataRDDFactory {
           LOGGER.warn("Cannot write load metadata file as data load failed")
           throw new Exception("No Data to load")
         }
-        val metadataDetails = status(0)._2
+        val metadataDetails = status(0)._2._1
         if (!isAgg) {
           writeDictionary(carbonLoadModel, result, false)
           val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 5a476da..90d0ea5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConst
 import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.converter.FieldConverter;
 import org.apache.carbondata.processing.newflow.converter.RowConverter;
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
@@ -156,7 +157,7 @@ public class RowConverterImpl implements RowConverter {
       if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
         if (badRecordLogger.isDataLoadFail()) {
           String error = "Data load failed due to bad record: " + logHolder.getReason();
-          throw new CarbonDataLoadingException(error);
+          throw new BadRecordFoundException(error);
         }
         badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
         logHolder.clear();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
index 840f28c..eb95528 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.processing.newflow.exception;
 
-public class BadRecordFoundException extends Exception {
+public class BadRecordFoundException extends CarbonDataLoadingException {
   /**
    * default serial version ID.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/105b7c34/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
index d901ba4..56a32a3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ThreadStatusObserver.java
@@ -21,6 +21,11 @@ import java.util.concurrent.ExecutorService;
 
 public class ThreadStatusObserver {
 
+  /**
+   * lock object
+   */
+  private Object lock = new Object();
+
   private ExecutorService executorService;
 
   private Throwable throwable;
@@ -30,8 +35,18 @@ public class ThreadStatusObserver {
   }
 
   public void notifyFailed(Throwable throwable) {
-    executorService.shutdownNow();
-    this.throwable = throwable;
+    // Only the first failing thread should call for shutting down the executor service and
+    // should assign the throwable object else the actual cause for failure can be overridden as
+    // all the running threads will throw interrupted exception on calling shutdownNow and
+    // will override the throwable object
+    if (null == this.throwable) {
+      synchronized (lock) {
+        if (null == this.throwable) {
+          executorService.shutdownNow();
+          this.throwable = throwable;
+        }
+      }
+    }
   }
 
   public Throwable getThrowable() {