You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/05/24 05:29:20 UTC
[hudi] branch master updated: [HUDI-1873] collect() call causing
issues with very large upserts (#2907)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 369a849 [HUDI-1873] collect() call causing issues with very large upserts (#2907)
369a849 is described below
commit 369a8493378f069d5471098d75a251fa67d99737
Author: mpouttu <83...@users.noreply.github.com>
AuthorDate: Sun May 23 22:29:01 2021 -0700
[HUDI-1873] collect() call causing issues with very large upserts (#2907)
Co-authored-by: Sivabalan Narayanan <si...@uber.com>
---
.../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 9653e6f..93442d5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -540,9 +540,8 @@ private[hudi] object HoodieSparkSqlWriter {
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String]) = {
- val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count()
- if (errorCount == 0) {
- log.info("No errors. Proceeding to commit the write.")
+ if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
+ log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess =
@@ -559,7 +558,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
- val compactionInstant : common.util.Option[java.lang.String] =
+ val compactionInstant: common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
@@ -568,7 +567,7 @@ private[hudi] object HoodieSparkSqlWriter {
log.info(s"Compaction Scheduled is $compactionInstant")
- val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)
+ val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
@@ -576,7 +575,7 @@ private[hudi] object HoodieSparkSqlWriter {
}
(commitSuccess && metaSyncSuccess, compactionInstant)
} else {
- log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :")
+ log.error(s"${tableInstantInfo.operation} failed with errors")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)