You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:08 UTC

[10/47] incubator-carbondata git commit: [CARBONDATA-93][Bug] Task not re-submitted by spark on data load failure (#850)

[CARBONDATA-93][Bug] Task not re-submitted by spark on data load failure (#850)

In case of data load failure on any executor, exception thrown is caught and logged as an error but the exception is not re-thrown to the spark therefore spark does not resubmits the task to the available executors again.
Fix: Catch the exception, log the error and after that re-throw the exception

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/30e4f259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/30e4f259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/30e4f259

Branch: refs/heads/master
Commit: 30e4f25942f9c40ea4895410bdc25b679f6322dc
Parents: a81ed98
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 23 12:34:33 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Sat Jul 23 12:34:33 2016 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataLoadRDD.scala           | 16 +---
 .../spark/rdd/CarbonDataRDDFactory.scala        | 99 +++++++++++---------
 2 files changed, 59 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30e4f259/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 06bd12f..18ea3b0 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -201,12 +201,10 @@ class CarbonDataLoadRDD[K, V](
               dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
               logInfo("Bad Record Found")
             } else {
-              dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-              LOGGER.error(e)
+              throw e
             }
             case e: Exception =>
-              dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-              LOGGER.error(e)
+              throw e
           } finally {
             if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
               val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
@@ -216,10 +214,7 @@ class CarbonDataLoadRDD[K, V](
                 case e: Exception =>
                   LOGGER.error(e)
               }
-              dataloadStatus = checkAndLoadAggregationTable
-              if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
-                logInfo("DataLoad failure")
-              } else if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+              if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
                   .equals(dataloadStatus)) {
                 logInfo("DataLoad complete")
                 logInfo("Data Load partially successful with LoadCount:" + loadCount)
@@ -229,15 +224,14 @@ class CarbonDataLoadRDD[K, V](
                 CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
                   model.getPartitionId)
               }
-            } else {
-              logInfo("DataLoad failure")
             }
           }
         }
       } catch {
         case e: Exception =>
-          dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
           logInfo("DataLoad failure")
+          LOGGER.error(e)
+          throw e
       }
 
       def setModelAndBlocksInfo(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30e4f259/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7271ee0..b314f88 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -766,50 +766,59 @@ object CarbonDataRDDFactory extends Logging {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
         partitioner.partitionCount, currentLoadCount.toString)
-      val status = new
-          CarbonDataLoadRDD(sc.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            storeLocation,
-            hdfsStoreLocation,
-            kettleHomePath,
-            partitioner,
-            columinar,
-            currentRestructNumber,
-            currentLoadCount,
-            cubeCreationTime,
-            schemaLastUpdatedTime,
-            blocksGroupBy,
-            isTableSplitPartition
-          ).collect()
-      val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-      status.foreach { eachLoadStatus =>
-        val state = newStatusMap.get(eachLoadStatus._1)
-        state match {
-          case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-            newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-          case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-            if eachLoadStatus._2.getLoadStatus == CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-            newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-          case _ =>
-            newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-        }
-      }
-
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      newStatusMap.foreach {
-        case (key, value) =>
-          if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-            loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-            !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-            loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+      var status: Array[(String, LoadMetadataDetails)] = null
+      try {
+        status = new
+            CarbonDataLoadRDD(sc.sparkContext,
+              new DataLoadResultImpl(),
+              carbonLoadModel,
+              storeLocation,
+              hdfsStoreLocation,
+              kettleHomePath,
+              partitioner,
+              columinar,
+              currentRestructNumber,
+              currentLoadCount,
+              cubeCreationTime,
+              schemaLastUpdatedTime,
+              blocksGroupBy,
+              isTableSplitPartition
+            ).collect()
+        val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+        status.foreach { eachLoadStatus =>
+          val state = newStatusMap.get(eachLoadStatus._1)
+          state match {
+            case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+            case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+              if eachLoadStatus._2.getLoadStatus ==
+                 CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+            case _ =>
+              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
           }
-      }
+        }
 
-      if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
-        partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
-        loadStatus = partitionStatus
+        newStatusMap.foreach {
+          case (key, value) =>
+            if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+            } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                       !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+            }
+        }
+
+        if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
+            partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
+          loadStatus = partitionStatus
+        }
+      } catch {
+        case ex: Throwable =>
+          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          logInfo("DataLoad failure")
+          logger.error(ex)
       }
 
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
@@ -823,12 +832,12 @@ object CarbonDataRDDFactory extends Logging {
           )
           message = "Aggregate table creation failure"
         } else {
-          val (result, _) = status(0)
-          val newSlice = CarbonCommonConstants.LOAD_FOLDER + result
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
           val aggTables = carbonTable.getAggregateTablesName
           if (null != aggTables && !aggTables.isEmpty) {
             // TODO:need to clean aggTable
+            val (result, _) = status(0)
+            val newSlice = CarbonCommonConstants.LOAD_FOLDER + result
             aggTables.asScala.foreach { aggTableName =>
               CarbonLoaderUtil
                 .deleteSlice(partitioner.partitionCount, carbonLoadModel.getDatabaseName,
@@ -837,12 +846,12 @@ object CarbonDataRDDFactory extends Logging {
                 )
             }
           }
-          message = "Dataload failure"
+          message = "DataLoad failure"
         }
         logInfo("********clean up done**********")
         logger.audit(s"Data load is failed for " +
           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-        logWarning("Unable to write load metadata file")
+        logWarning("Cannot write load metadata file as data load failed")
         throw new Exception(message)
       } else {
         val metadataDetails = status(0)._2