You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/11/23 18:43:35 UTC

[hudi] branch master updated: [HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (#2253)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 751e4ee  [HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (#2253)
751e4ee is described below

commit 751e4ee88219c9d7d212cd2abcfe57ec59bb322e
Author: wenningd <we...@gmail.com>
AuthorDate: Mon Nov 23 10:43:24 2020 -0800

    [HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (#2253)
    
    Co-authored-by: Wenning Ding <we...@amazon.com>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 12 +++-
 .../functional/HoodieSparkSqlWriterSuite.scala     | 64 ++++++++++++++++++++--
 2 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 8cad2f3..c7d5c87 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -221,7 +221,8 @@ private[hudi] object HoodieSparkSqlWriter {
                 mode: SaveMode,
                 parameters: Map[String, String],
                 df: DataFrame,
-                hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty): Boolean = {
+                hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
+                hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
 
     val sparkContext = sqlContext.sparkContext
     val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
@@ -263,8 +264,13 @@ private[hudi] object HoodieSparkSqlWriter {
     }
 
     val jsc = new JavaSparkContext(sqlContext.sparkContext)
-    val writeClient = DataSourceUtils.createHoodieClient(jsc, schema, path, tableName, mapAsJavaMap(parameters))
-    writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
+    val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
+      schema, path, tableName, mapAsJavaMap(parameters)))
+    try {
+      writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
+    } finally {
+      writeClient.close()
+    }
     val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
     metaSyncSuccess
   }
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index e165904..41a45b2 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -17,17 +17,18 @@
 
 package org.apache.hudi.functional
 
+import java.time.Instant
 import java.util
-import java.util.{Date, UUID}
+import java.util.{Collections, Date, UUID}
 
 import org.apache.commons.io.FileUtils
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
 import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
 import org.apache.spark.SparkContext
@@ -341,6 +342,61 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
       }
     })
 
+  List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    .foreach(tableType => {
+      test("test HoodieSparkSqlWriter functionality with datasource bootstrap for " + tableType) {
+        initSparkContext("test_bootstrap_datasource")
+        val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+        val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path")
+
+        try {
+
+          val hoodieFooTableName = "hoodie_foo_tbl"
+
+          val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc,
+            spark.sqlContext)
+
+          // Write source data non-partitioned
+          sourceDF.write
+            .format("parquet")
+            .mode(SaveMode.Overwrite)
+            .save(srcPath.toAbsolutePath.toString)
+
+          val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+            HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP -> srcPath.toAbsolutePath.toString,
+            HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
+            HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
+            DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL,
+            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+            HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
+          val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+          val client = spy(DataSourceUtils.createHoodieClient(
+            new JavaSparkContext(sc),
+            null,
+            path.toAbsolutePath.toString,
+            hoodieFooTableName,
+            mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
+
+          HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty,
+            Option(client))
+
+          // Verify that HoodieWriteClient is closed correctly
+          verify(client, times(1)).close()
+
+          // fetch all records from parquet files generated from write to hudi
+          val actualDf = sqlContext.read.parquet(path.toAbsolutePath.toString)
+          assert(actualDf.count == 100)
+        } finally {
+          spark.stop()
+          FileUtils.deleteDirectory(path.toFile)
+          FileUtils.deleteDirectory(srcPath.toFile)
+        }
+      }
+    })
+
   case class Test(uuid: String, ts: Long)
 
   import scala.collection.JavaConverters