You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:08 UTC
[10/47] incubator-carbondata git commit: [CARBONDATA-93][Bug] Task
not re-submitted by spark on data load failure (#850)
[CARBONDATA-93][Bug] Task not re-submitted by spark on data load failure (#850)
In case of data load failure on any executor, exception thrown is caught and logged as an error but the exception is not re-thrown to the spark therefore spark does not resubmits the task to the available executors again.
Fix: Catch the exception, log the error and after that re-throw the exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/30e4f259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/30e4f259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/30e4f259
Branch: refs/heads/master
Commit: 30e4f25942f9c40ea4895410bdc25b679f6322dc
Parents: a81ed98
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 23 12:34:33 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Sat Jul 23 12:34:33 2016 +0530
----------------------------------------------------------------------
.../spark/rdd/CarbonDataLoadRDD.scala | 16 +---
.../spark/rdd/CarbonDataRDDFactory.scala | 99 +++++++++++---------
2 files changed, 59 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30e4f259/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 06bd12f..18ea3b0 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -201,12 +201,10 @@ class CarbonDataLoadRDD[K, V](
dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
logInfo("Bad Record Found")
} else {
- dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- LOGGER.error(e)
+ throw e
}
case e: Exception =>
- dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- LOGGER.error(e)
+ throw e
} finally {
if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
@@ -216,10 +214,7 @@ class CarbonDataLoadRDD[K, V](
case e: Exception =>
LOGGER.error(e)
}
- dataloadStatus = checkAndLoadAggregationTable
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
- logInfo("DataLoad failure")
- } else if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
.equals(dataloadStatus)) {
logInfo("DataLoad complete")
logInfo("Data Load partially successful with LoadCount:" + loadCount)
@@ -229,15 +224,14 @@ class CarbonDataLoadRDD[K, V](
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
model.getPartitionId)
}
- } else {
- logInfo("DataLoad failure")
}
}
}
} catch {
case e: Exception =>
- dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
logInfo("DataLoad failure")
+ LOGGER.error(e)
+ throw e
}
def setModelAndBlocksInfo(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30e4f259/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7271ee0..b314f88 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -766,50 +766,59 @@ object CarbonDataRDDFactory extends Logging {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
partitioner.partitionCount, currentLoadCount.toString)
- val status = new
- CarbonDataLoadRDD(sc.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storeLocation,
- hdfsStoreLocation,
- kettleHomePath,
- partitioner,
- columinar,
- currentRestructNumber,
- currentLoadCount,
- cubeCreationTime,
- schemaLastUpdatedTime,
- blocksGroupBy,
- isTableSplitPartition
- ).collect()
- val newStatusMap = scala.collection.mutable.Map.empty[String, String]
- status.foreach { eachLoadStatus =>
- val state = newStatusMap.get(eachLoadStatus._1)
- state match {
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- if eachLoadStatus._2.getLoadStatus == CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case _ =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- }
- }
-
var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- newStatusMap.foreach {
- case (key, value) =>
- if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ var status: Array[(String, LoadMetadataDetails)] = null
+ try {
+ status = new
+ CarbonDataLoadRDD(sc.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storeLocation,
+ hdfsStoreLocation,
+ kettleHomePath,
+ partitioner,
+ columinar,
+ currentRestructNumber,
+ currentLoadCount,
+ cubeCreationTime,
+ schemaLastUpdatedTime,
+ blocksGroupBy,
+ isTableSplitPartition
+ ).collect()
+ val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+ status.foreach { eachLoadStatus =>
+ val state = newStatusMap.get(eachLoadStatus._1)
+ state match {
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ if eachLoadStatus._2.getLoadStatus ==
+ CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case _ =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
}
- }
+ }
- if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
- partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
- loadStatus = partitionStatus
+ newStatusMap.foreach {
+ case (key, value) =>
+ if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+ !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ }
+ }
+
+ if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
+ partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
+ loadStatus = partitionStatus
+ }
+ } catch {
+ case ex: Throwable =>
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ logInfo("DataLoad failure")
+ logger.error(ex)
}
if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
@@ -823,12 +832,12 @@ object CarbonDataRDDFactory extends Logging {
)
message = "Aggregate table creation failure"
} else {
- val (result, _) = status(0)
- val newSlice = CarbonCommonConstants.LOAD_FOLDER + result
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
val aggTables = carbonTable.getAggregateTablesName
if (null != aggTables && !aggTables.isEmpty) {
// TODO:need to clean aggTable
+ val (result, _) = status(0)
+ val newSlice = CarbonCommonConstants.LOAD_FOLDER + result
aggTables.asScala.foreach { aggTableName =>
CarbonLoaderUtil
.deleteSlice(partitioner.partitionCount, carbonLoadModel.getDatabaseName,
@@ -837,12 +846,12 @@ object CarbonDataRDDFactory extends Logging {
)
}
}
- message = "Dataload failure"
+ message = "DataLoad failure"
}
logInfo("********clean up done**********")
logger.audit(s"Data load is failed for " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
- logWarning("Unable to write load metadata file")
+ logWarning("Cannot write load metadata file as data load failed")
throw new Exception(message)
} else {
val metadataDetails = status(0)._2