You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:29 UTC

[31/47] incubator-carbondata git commit: [CARBONDATA-111]Need to catch exception in compaction and rethrow it to spark. (#858)

[CARBONDATA-111]Need to catch exception in compaction  and rethrow it to spark. (#858)

 Need to catch exception and rethrow it to spark. need to unlock the compaction lock in case of exception.
 Need to set the property spark.job.interruptOnCancel should be set to true so that if the user kills the job in UI.
The task will also be killed. if any exception occurs in compaction then clean up the HDFS folders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/973def99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/973def99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/973def99

Branch: refs/heads/master
Commit: 973def99f72c267e625497daf2ece18c18767d73
Parents: bdc1321
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Jul 28 21:03:20 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Thu Jul 28 21:03:20 2016 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  88 ++++++------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 136 ++++++++++---------
 2 files changed, 122 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/973def99/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 15de7bf..54cf1bd 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -400,53 +400,44 @@ object CarbonDataRDDFactory extends Logging {
       new Thread {
         override def run(): Unit = {
 
-          while (loadsToMerge.size() > 1) {
-          // Deleting the any partially loaded data if present.
-          // in some case the segment folder which is present in store will not have entry in
-          // status.
-          // so deleting those folders.
           try {
-            CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-          }
-          catch {
-            case e: Exception =>
-              logger
-                .error("Exception in compaction thread while clean up of stale segments " + e
-                  .getMessage
-                )
-          }
-            val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-              CarbonCommonConstants
-                .DEFAULT_COLLECTION_SIZE
-            )
+            while (loadsToMerge.size() > 1) {
+              deletePartialLoadsInCompaction(carbonLoadModel)
+              val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+                CarbonCommonConstants
+                  .DEFAULT_COLLECTION_SIZE
+              )
 
-            scanSegmentsAndSubmitJob(futureList)
+              scanSegmentsAndSubmitJob(futureList)
 
-            futureList.asScala.foreach(future => {
-              try {
+              futureList.asScala.foreach(future => {
                 future.get
               }
-              catch {
-                case e: Exception =>
-                  logger.error("Exception in compaction thread " + e.getMessage)
-              }
+              )
+              // scan again and deterrmine if anything is there to merge again.
+              readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
+              segList = carbonLoadModel.getLoadMetadataDetails
+
+              loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+                hdfsStoreLocation,
+                carbonLoadModel,
+                partitioner.partitionCount,
+                compactionModel.compactionSize,
+                segList,
+                compactionModel.compactionType
+              )
             }
-            )
-            // scan again and deterrmine if anything is there to merge again.
-            readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
-            segList = carbonLoadModel.getLoadMetadataDetails
-
-            loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-              hdfsStoreLocation,
-              carbonLoadModel,
-              partitioner.partitionCount,
-              compactionModel.compactionSize,
-              segList,
-              compactionModel.compactionType
-            )
           }
-          executor.shutdown()
-          compactionLock.unlock()
+          catch {
+            case e: Exception =>
+              logger.error("Exception in compaction thread " + e.getMessage)
+              throw e
+          }
+          finally {
+            executor.shutdownNow()
+            deletePartialLoadsInCompaction(carbonLoadModel)
+            compactionLock.unlock()
+          }
         }
       }.start
     }
@@ -504,6 +495,23 @@ object CarbonDataRDDFactory extends Logging {
     }
   }
 
+  def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
+    // Deleting the any partially loaded data if present.
+    // in some case the segment folder which is present in store will not have entry in
+    // status.
+    // so deleting those folders.
+    try {
+      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+    }
+    catch {
+      case e: Exception =>
+        logger
+          .error("Exception in compaction thread while clean up of stale segments " + e
+            .getMessage
+          )
+    }
+  }
+
   def loadCarbonData(sc: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storeLocation: String,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/973def99/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0129a2c..254d51b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -57,6 +57,7 @@ class CarbonMergerRDD[K, V](
 
   val defaultParallelism = sc.defaultParallelism
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
+  sc.setLocalProperty("spark.job.interruptOnCancel", "true")
 
   val storeLocation = carbonMergerMapping.storeLocation
   val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
@@ -68,83 +69,94 @@ class CarbonMergerRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-      val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
-      val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
-        .getTableName + carbonLoadModel.getTaskNo
-      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      var mergeStatus = false
+      try {
+        var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
-      // sorting the table block info List.
-      var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+        val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
+          .getTableName + carbonLoadModel.getTaskNo
+        CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
 
-      Collections.sort(tableBlockInfoList)
+        // sorting the table block info List.
+        var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
 
-      val segmentMapping: java.util.Map[String, TaskBlockInfo] =
-        CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+        Collections.sort(tableBlockInfoList)
 
-      val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
-        CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
 
-      carbonLoadModel.setStorePath(hdfsStoreLocation)
+        val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
+          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
 
-      // taking the last table block info for getting the segment properties.
-      val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
-      (tableBlockInfoList.size()-1).getSegmentId())
+        carbonLoadModel.setStorePath(hdfsStoreLocation)
 
-      val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
-        .getColumnCardinality
+        // taking the last table block info for getting the segment properties.
+        val listMetadata = dataFileMetadataSegMapping.get(tableBlockInfoList.get
+        (tableBlockInfoList.size() - 1).getSegmentId()
+        )
 
-      val segmentProperties = new SegmentProperties(
-        listMetadata.get(listMetadata.size() - 1).getColumnInTable,
-        colCardinality
-      )
+        val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() - 1).getSegmentInfo
+          .getColumnCardinality
 
-      val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, schemaName,
-        factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-        dataFileMetadataSegMapping
-      )
+        val segmentProperties = new SegmentProperties(
+          listMetadata.get(listMetadata.size() - 1).getColumnInTable,
+          colCardinality
+        )
 
-      // fire a query and get the results.
-      var result2: util.List[RawResultIterator] = null
-      try {
-        result2 = exec.processTableBlocks()
-      } catch {
-        case e: Throwable =>
-          exec.clearDictionaryFromQueryModel
-          LOGGER.error(e)
-          if (null != e.getMessage) {
-            sys.error("Exception occurred in query execution :: " + e.getMessage)
-          } else {
-            sys.error("Exception occurred in query execution.Please check logs.")
-          }
-      }
+        val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, schemaName,
+          factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+          dataFileMetadataSegMapping
+        )
 
-      val mergeNumber = mergedLoadName
-        .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-          CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+        // fire a query and get the results.
+        var result2: util.List[RawResultIterator] = null
+        try {
+          result2 = exec.processTableBlocks()
+        } catch {
+          case e: Throwable =>
+            exec.clearDictionaryFromQueryModel
+            LOGGER.error(e)
+            if (null != e.getMessage) {
+              sys.error("Exception occurred in query execution :: " + e.getMessage)
+            } else {
+              sys.error("Exception occurred in query execution.Please check logs.")
+            }
+        }
+
+        val mergeNumber = mergedLoadName
+          .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+            CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+          )
+
+        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(schemaName,
+          factTableName,
+          carbonLoadModel.getTaskNo,
+          "0",
+          mergeNumber
         )
 
-      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(schemaName,
-        factTableName,
-        carbonLoadModel.getTaskNo,
-        "0",
-        mergeNumber
-      )
+        carbonLoadModel.setSegmentId(mergeNumber)
+        carbonLoadModel.setPartitionId("0")
+        val merger =
+          new RowResultMerger(result2,
+            schemaName,
+            factTableName,
+            segmentProperties,
+            tempStoreLoc,
+            carbonLoadModel,
+            colCardinality
+          )
+        mergeStatus = merger.mergerSlice()
 
-      carbonLoadModel.setSegmentId(mergeNumber)
-      carbonLoadModel.setPartitionId("0")
-      val merger =
-        new RowResultMerger(result2,
-        schemaName,
-        factTableName,
-        segmentProperties,
-        tempStoreLoc,
-        carbonLoadModel,
-        colCardinality
-      )
-      val mergeStatus = merger.mergerSlice()
+      }
+      catch {
+        case e: Exception =>
+          LOGGER.error(e)
+          throw e
+      }
 
       var finished = false