You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/05/18 23:11:53 UTC

[hudi] branch master updated: [HUDI-1806] Honoring skipROSuffix in spark ds (#2882)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1f592  [HUDI-1806] Honoring skipROSuffix in spark ds (#2882)
5d1f592 is described below

commit 5d1f592395fe23459091ac2c7a5d539cdd9be819
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Tue May 18 19:11:39 2021 -0400

    [HUDI-1806] Honoring skipROSuffix in spark ds (#2882)
    
    * Honoring skipROSuffix in spark ds
    
    * Adding tests
    
    * fixing scala checkstype issue
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 ++
 .../functional/HoodieSparkSqlWriterSuite.scala     | 38 ++++++++++++++++++++--
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3a5b51e..9653e6f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -412,6 +412,8 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
     hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
     hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
+    hiveSyncConfig.skipROSuffix = parameters.getOrElse(HIVE_SKIP_RO_SUFFIX,
+      DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL).toBoolean
     hiveSyncConfig.partitionFields =
       ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
     hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 269c4ac..d53b59e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -20,7 +20,6 @@ package org.apache.hudi.functional
 import java.time.Instant
 import java.util
 import java.util.{Collections, Date, UUID}
-
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions._
@@ -43,7 +42,7 @@ import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.{FunSuite, Matchers}
 
 import scala.collection.JavaConversions._
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
 class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
 
@@ -503,7 +502,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     val params = Map(
       "path" -> basePath,
       DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
-      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition",
+      DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX -> "true"
     )
     val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
     val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
@@ -518,6 +518,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
       new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
 
+    assertTrue(hiveSyncConfig.skipROSuffix)
     assertResult("spark.sql.sources.provider=hudi\n" +
       "spark.sql.sources.schema.partCol.0=partition\n" +
       "spark.sql.sources.schema.numParts=1\n" +
@@ -530,6 +531,37 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
   }
 
+  test("Test build sync config for skip Ro Suffix vals") {
+    initSparkContext("test build sync config for skip Ro suffix vals")
+    val addSqlTablePropertiesMethod =
+      HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
+        classOf[SQLConf], classOf[StructType], classOf[Map[_, _]])
+    addSqlTablePropertiesMethod.setAccessible(true)
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val basePath = "/tmp/hoodie_test"
+    val params = Map(
+      "path" -> basePath,
+      DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
+      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+    )
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
+    val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
+      spark.sessionState.conf, structType, parameters)
+      .asInstanceOf[Map[String, String]]
+
+    val buildSyncConfigMethod =
+      HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
+        classOf[Map[_, _]])
+    buildSyncConfigMethod.setAccessible(true)
+
+    val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
+      new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+
+    assertFalse(hiveSyncConfig.skipROSuffix)
+  }
+
   case class Test(uuid: String, ts: Long)
 
   import scala.collection.JavaConverters