You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/05/24 05:29:20 UTC

[hudi] branch master updated: [HUDI-1873] collect() call causing issues with very large upserts (#2907)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 369a849  [HUDI-1873] collect() call causing issues with very large upserts (#2907)
369a849 is described below

commit 369a8493378f069d5471098d75a251fa67d99737
Author: mpouttu <83...@users.noreply.github.com>
AuthorDate: Sun May 23 22:29:01 2021 -0700

    [HUDI-1873] collect() call causing issues with very large upserts (#2907)
    
    Co-authored-by: Sivabalan Narayanan <si...@uber.com>
---
 .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 9653e6f..93442d5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -540,9 +540,8 @@ private[hudi] object HoodieSparkSqlWriter {
                                              jsc: JavaSparkContext,
                                              tableInstantInfo: TableInstantInfo
                                              ): (Boolean, common.util.Option[java.lang.String]) = {
-    val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count()
-    if (errorCount == 0) {
-      log.info("No errors. Proceeding to commit the write.")
+    if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
+      log.info("Proceeding to commit the write.")
       val metaMap = parameters.filter(kv =>
         kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
       val commitSuccess =
@@ -559,7 +558,7 @@ private[hudi] object HoodieSparkSqlWriter {
       }
 
       val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
-      val compactionInstant : common.util.Option[java.lang.String] =
+      val compactionInstant: common.util.Option[java.lang.String] =
         if (asyncCompactionEnabled) {
           client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
         } else {
@@ -568,7 +567,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
       log.info(s"Compaction Scheduled is $compactionInstant")
 
-      val metaSyncSuccess =  metaSync(spark, parameters, tableInstantInfo.basePath, schema)
+      val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)
 
       log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
       if (!asyncCompactionEnabled) {
@@ -576,7 +575,7 @@ private[hudi] object HoodieSparkSqlWriter {
       }
       (commitSuccess && metaSyncSuccess, compactionInstant)
     } else {
-      log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :")
+      log.error(s"${tableInstantInfo.operation} failed with errors")
       if (log.isTraceEnabled) {
         log.trace("Printing out the top 100 errors")
         writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)