You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/24 08:29:31 UTC

[GitHub] [hudi] KnightChess commented on issue #6679: [SUPPORT] Expect job status failed in spark batch model

KnightChess commented on issue #6679:
URL: https://github.com/apache/hudi/issues/6679#issuecomment-1288632725

   @nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not streaming job. Follow the config which you advice in code, I found the execption processing logic in streaming model, I will try to refer it to implement my logic in batch job. thanks
   ```scala
         Try(
           HoodieSparkSqlWriter.write(
             sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
         ) match {
           case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
             log.info(s"Micro batch id=$batchId succeeded"
               + (commitOps.isPresent match {
               case true => s" for commit=${commitOps.get()}"
               case _ => s" with no new commits"
             }))
             writeClient = Some(client)
             hoodieTableConfig = Some(tableConfig)
             if (compactionInstantOps.isPresent) {
               asyncCompactorService.enqueuePendingAsyncServiceInstant(
                 new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
             }
             if (clusteringInstant.isPresent) {
               asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant(
                 State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
               ))
             }
             Success((true, commitOps, compactionInstantOps))
           case Failure(e) =>
             // clean up persist rdds in the write process
             data.sparkSession.sparkContext.getPersistentRDDs
               .foreach {
                 case (id, rdd) =>
                   try {
                     rdd.unpersist()
                   } catch {
                     case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
                   }
               }
             log.error(s"Micro batch id=$batchId threw following exception: ", e)
             if (ignoreFailedBatch) {
               log.info(s"Ignore the exception and move on streaming as per " +
                 s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
               Success((true, None, None))
             } else {
               if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
               Failure(e)
             }
           case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
             log.error(s"Micro batch id=$batchId ended up with errors"
               + (commitOps.isPresent match {
                 case true =>  s" for commit=${commitOps.get()}"
                 case _ => s""
               }))
             if (ignoreFailedBatch) {
               log.info(s"Ignore the errors and move on streaming as per " +
                 s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
               Success((true, None, None))
             } else {
               if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
               Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
             }
         }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org