You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/12/20 16:11:46 UTC

[GitHub] [incubator-hudi] nsivabalan commented on a change in pull request #1114: [HUDI-438] Merge duplicated code fragment

nsivabalan commented on a change in pull request #1114: [HUDI-438] Merge duplicated code fragment
URL: https://github.com/apache/incubator-hudi/pull/1114#discussion_r360439969
 
 

 ##########
 File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 ##########
 @@ -225,52 +179,7 @@ private[hudi] object HoodieSparkSqlWriter {
       // Issue deletes
       commitTime = client.startCommit()
       writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
-      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-      writeSuccessful =
-        if (errorCount == 0) {
-          log.info("No errors. Proceeding to commit the write.")
-          val metaMap = parameters.filter(kv =>
-            kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-          val commitSuccess = if (metaMap.isEmpty) {
-            client.commit(commitTime, writeStatuses)
-          } else {
-            client.commit(commitTime, writeStatuses,
-              common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
-          }
-
-          if (commitSuccess) {
-            log.info("Commit " + commitTime + " successful!")
-          }
-          else {
-            log.info("Commit " + commitTime + " failed!")
-          }
-
-          val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
-          val syncHiveSucess = if (hiveSyncEnabled) {
-            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
-            val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-            syncHive(basePath, fs, parameters)
-          } else {
-            true
-          }
-          client.close()
-          commitSuccess && syncHiveSucess
-        } else {
-          log.error(s"$operation failed with ${errorCount} errors :");
-          if (log.isTraceEnabled) {
-            log.trace("Printing out the top 100 errors")
-            writeStatuses.rdd.filter(ws => ws.hasErrors)
-              .take(100)
-              .foreach(ws => {
-                log.trace("Global error :", ws.getGlobalError)
-                if (ws.getErrors.size() > 0) {
-                  ws.getErrors.foreach(kt =>
-                    log.trace(s"Error for key: ${kt._1}", kt._2))
-                }
-              })
-          }
-          false
-        }
+      checkWriteStatus(writeStatuses, parameters, client, commitTime, basePath, operation, jsc)
 
 Review comment:
   this is good enough. But did you get a chance to try to get checkWriteStatus(writeStatuses, parameters, client, commitTime, basePath, operation, jsc) outside the if else block. in other words, instead of calling both inside if and else, can we call it after if else block. If thats doable. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services