You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/05/18 23:11:53 UTC
[hudi] branch master updated: [HUDI-1806] Honoring skipROSuffix in
spark ds (#2882)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1f592 [HUDI-1806] Honoring skipROSuffix in spark ds (#2882)
5d1f592 is described below
commit 5d1f592395fe23459091ac2c7a5d539cdd9be819
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Tue May 18 19:11:39 2021 -0400
[HUDI-1806] Honoring skipROSuffix in spark ds (#2882)
* Honoring skipROSuffix in spark ds
* Adding tests
* fixing scala checkstype issue
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 ++
.../functional/HoodieSparkSqlWriterSuite.scala | 38 ++++++++++++++++++++--
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3a5b51e..9653e6f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -412,6 +412,8 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
+ hiveSyncConfig.skipROSuffix = parameters.getOrElse(HIVE_SKIP_RO_SUFFIX,
+ DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL).toBoolean
hiveSyncConfig.partitionFields =
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 269c4ac..d53b59e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -20,7 +20,6 @@ package org.apache.hudi.functional
import java.time.Instant
import java.util
import java.util.{Collections, Date, UUID}
-
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
@@ -43,7 +42,7 @@ import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConversions._
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
@@ -503,7 +502,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val params = Map(
"path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
- DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+ DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition",
+ DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX -> "true"
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
@@ -518,6 +518,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+ assertTrue(hiveSyncConfig.skipROSuffix)
assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
@@ -530,6 +531,37 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
}
+ test("Test build sync config for skip Ro Suffix vals") {
+ initSparkContext("test build sync config for skip Ro suffix vals")
+ val addSqlTablePropertiesMethod =
+ HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
+ classOf[SQLConf], classOf[StructType], classOf[Map[_, _]])
+ addSqlTablePropertiesMethod.setAccessible(true)
+
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val basePath = "/tmp/hoodie_test"
+ val params = Map(
+ "path" -> basePath,
+ DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
+ DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
+ )
+ val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
+ val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
+ spark.sessionState.conf, structType, parameters)
+ .asInstanceOf[Map[String, String]]
+
+ val buildSyncConfigMethod =
+ HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
+ classOf[Map[_, _]])
+ buildSyncConfigMethod.setAccessible(true)
+
+ val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
+ new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]
+
+ assertFalse(hiveSyncConfig.skipROSuffix)
+ }
+
case class Test(uuid: String, ts: Long)
import scala.collection.JavaConverters