You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/01/06 14:51:29 UTC
[incubator-hudi] branch master updated: [HUDI-438] Merge duplicated
code fragment in HoodieSparkSqlWriter (#1114)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2d5b79d [HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114)
2d5b79d is described below
commit 2d5b79d96fa23571da5003fd4460d2d6d3998275
Author: hongdd <jn...@163.com>
AuthorDate: Mon Jan 6 22:51:22 2020 +0800
[HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 165 ++++++++-------------
1 file changed, 63 insertions(+), 102 deletions(-)
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index af19e28..62bdd19 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
@@ -74,19 +75,14 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}
- var writeSuccessful: Boolean = false
- var writeStatuses: JavaRDD[WriteStatus] = null
-
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
val commitTime = HoodieActiveTimeline.createNewInstantTime();
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
- // Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for
- // write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload>
- // is having issues. Hence some codes blocks are same in both if and else blocks.
- if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+ val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
@@ -147,54 +143,8 @@ private[hudi] object HoodieSparkSqlWriter {
(true, common.util.Option.empty())
}
client.startCommitWithTime(commitTime)
- writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
- // Check for errors and commit the write.
- val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
- writeSuccessful =
- if (errorCount == 0) {
- log.info("No errors. Proceeding to commit the write.")
- val metaMap = parameters.filter(kv =>
- kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
- val commitSuccess = if (metaMap.isEmpty) {
- client.commit(commitTime, writeStatuses)
- } else {
- client.commit(commitTime, writeStatuses,
- common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
- }
-
- if (commitSuccess) {
- log.info("Commit " + commitTime + " successful!")
- }
- else {
- log.info("Commit " + commitTime + " failed!")
- }
-
- val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
- val syncHiveSucess = if (hiveSyncEnabled) {
- log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
- val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
- syncHive(basePath, fs, parameters)
- } else {
- true
- }
- client.close()
- commitSuccess && syncHiveSucess
- } else {
- log.error(s"$operation failed with ${errorCount} errors :");
- if (log.isTraceEnabled) {
- log.trace("Printing out the top 100 errors")
- writeStatuses.rdd.filter(ws => ws.hasErrors)
- .take(100)
- .foreach(ws => {
- log.trace("Global error :", ws.getGlobalError)
- if (ws.getErrors.size() > 0) {
- ws.getErrors.foreach(kt =>
- log.trace(s"Error for key: ${kt._1}", kt._2))
- }
- })
- }
- false
- }
+ val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
+ (writeStatuses, client)
} else {
// Handle save modes
@@ -225,55 +175,12 @@ private[hudi] object HoodieSparkSqlWriter {
// Issue deletes
client.startCommitWithTime(commitTime)
- writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
- val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
- writeSuccessful =
- if (errorCount == 0) {
- log.info("No errors. Proceeding to commit the write.")
- val metaMap = parameters.filter(kv =>
- kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
- val commitSuccess = if (metaMap.isEmpty) {
- client.commit(commitTime, writeStatuses)
- } else {
- client.commit(commitTime, writeStatuses,
- common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
- }
-
- if (commitSuccess) {
- log.info("Commit " + commitTime + " successful!")
- }
- else {
- log.info("Commit " + commitTime + " failed!")
- }
-
- val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
- val syncHiveSucess = if (hiveSyncEnabled) {
- log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
- val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
- syncHive(basePath, fs, parameters)
- } else {
- true
- }
- client.close()
- commitSuccess && syncHiveSucess
- } else {
- log.error(s"$operation failed with ${errorCount} errors :");
- if (log.isTraceEnabled) {
- log.trace("Printing out the top 100 errors")
- writeStatuses.rdd.filter(ws => ws.hasErrors)
- .take(100)
- .foreach(ws => {
- log.trace("Global error :", ws.getGlobalError)
- if (ws.getErrors.size() > 0) {
- ws.getErrors.foreach(kt =>
- log.trace(s"Error for key: ${kt._1}", kt._2))
- }
- })
- }
- false
- }
+ val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
+ (writeStatuses, client)
}
+ // Check for errors and commit the write.
+ val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, commitTime, basePath, operation, jsc)
(writeSuccessful, common.util.Option.ofNullable(commitTime))
}
@@ -340,4 +247,58 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig
}
+
+ private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
+ parameters: Map[String, String],
+ client: HoodieWriteClient[_],
+ commitTime: String,
+ basePath: Path,
+ operation: String,
+ jsc: JavaSparkContext): Boolean = {
+ val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+ if (errorCount == 0) {
+ log.info("No errors. Proceeding to commit the write.")
+ val metaMap = parameters.filter(kv =>
+ kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+ val commitSuccess = if (metaMap.isEmpty) {
+ client.commit(commitTime, writeStatuses)
+ } else {
+ client.commit(commitTime, writeStatuses,
+ common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+ }
+
+ if (commitSuccess) {
+ log.info("Commit " + commitTime + " successful!")
+ }
+ else {
+ log.info("Commit " + commitTime + " failed!")
+ }
+
+ val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+ val syncHiveSucess = if (hiveSyncEnabled) {
+ log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+ val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+ syncHive(basePath, fs, parameters)
+ } else {
+ true
+ }
+ client.close()
+ commitSuccess && syncHiveSucess
+ } else {
+ log.error(s"$operation failed with ${errorCount} errors :");
+ if (log.isTraceEnabled) {
+ log.trace("Printing out the top 100 errors")
+ writeStatuses.rdd.filter(ws => ws.hasErrors)
+ .take(100)
+ .foreach(ws => {
+ log.trace("Global error :", ws.getGlobalError)
+ if (ws.getErrors.size() > 0) {
+ ws.getErrors.foreach(kt =>
+ log.trace(s"Error for key: ${kt._1}", kt._2))
+ }
+ })
+ }
+ false
+ }
+ }
}