You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:44 UTC
[hudi] 02/08: [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1ccf37f2287484c83f078a6d2dbb7e61f263640a
Author: StreamingFlames <18...@163.com>
AuthorDate: Wed Feb 1 13:53:19 2023 +0800
[HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
---
.../command/procedures/DeleteMarkerProcedure.scala | 8 +-
.../RollbackToInstantTimeProcedure.scala | 50 ++++----
.../command/procedures/RunCleanProcedure.scala | 27 +++--
.../procedures/RunCompactionProcedure.scala | 126 +++++++++++----------
4 files changed, 121 insertions(+), 90 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index bfbab32599b..d99a5489799 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi.command.procedures
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.table.marker.WriteMarkersFactory
import org.apache.spark.internal.Logging
@@ -47,8 +48,9 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(tableName)
+ var client: SparkRDDWriteClient[_] = null
val result = Try {
- val client = createHoodieClient(jsc, basePath)
+ client = createHoodieClient(jsc, basePath)
val config = client.getConfig
val context = client.getEngineContext
val table = HoodieSparkTable.create(config, context)
@@ -63,6 +65,10 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
false
}
+ if (client != null) {
+ client.close()
+ }
+
Seq(Row(result))
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index 1fcc665d611..c8109bd56e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -52,28 +53,35 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
val basePath = hoodieCatalogTable.tableLocation
- val client = createHoodieClient(jsc, basePath)
- client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
- val config = getWriteConfig(basePath)
- val metaClient = HoodieTableMetaClient.builder
- .setConf(jsc.hadoopConfiguration)
- .setBasePath(config.getBasePath)
- .setLoadActiveTimelineOnLoad(false)
- .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
- .build
-
- val activeTimeline = metaClient.getActiveTimeline
- val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
- val filteredTimeline = completedTimeline.containsInstant(instantTime)
- if (!filteredTimeline) {
- throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = createHoodieClient(jsc, basePath)
+ client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
+ val config = getWriteConfig(basePath)
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(jsc.hadoopConfiguration)
+ .setBasePath(config.getBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
+ .build
+
+ val activeTimeline = metaClient.getActiveTimeline
+ val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
+ val filteredTimeline = completedTimeline.containsInstant(instantTime)
+ if (!filteredTimeline) {
+ throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+ }
+
+ val result = if (client.rollback(instantTime)) true else false
+ val outputRow = Row(result)
+
+ Seq(outputRow)
+ } finally {
+ if (client != null) {
+ client.close()
+ }
}
-
- val result = if (client.rollback(instantTime)) true else false
- val outputRow = Row(result)
-
- Seq(outputRow)
}
override def build: Procedure = new RollbackToInstantTimeProcedure()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 36580176d0f..ca8b3fc95bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.config.HoodieCleanConfig
@@ -79,16 +80,24 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
)
- val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
- val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
- if (hoodieCleanMeta == null) Seq.empty
- else Seq(Row(hoodieCleanMeta.getStartCleanTime,
- hoodieCleanMeta.getTimeTakenInMillis,
- hoodieCleanMeta.getTotalFilesDeleted,
- hoodieCleanMeta.getEarliestCommitToRetain,
- JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
- hoodieCleanMeta.getVersion))
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
+ val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
+
+ if (hoodieCleanMeta == null) Seq.empty
+ else Seq(Row(hoodieCleanMeta.getStartCleanTime,
+ hoodieCleanMeta.getTimeTakenInMillis,
+ hoodieCleanMeta.getTotalFilesDeleted,
+ hoodieCleanMeta.getEarliestCommitToRetain,
+ JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
+ hoodieCleanMeta.getVersion))
+ } finally {
+ if (client != null) {
+ client.close()
+ }
+ }
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 3c51d7d8b29..d79cf8c302f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi.command.procedures
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
@@ -64,70 +65,77 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
val basePath = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
- val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
-
- var willCompactionInstants: Seq[String] = Seq.empty
- operation match {
- case "schedule" =>
- val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
- if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
- willCompactionInstants = Seq(instantTime)
- }
- case "run" =>
- // Do compaction
- val timeLine = metaClient.getActiveTimeline
- val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
- .map(_.getTimestamp)
- .toSeq.sortBy(f => f)
- willCompactionInstants = if (instantTimestamp.isEmpty) {
- if (pendingCompactionInstants.nonEmpty) {
- pendingCompactionInstants
- } else { // If there are no pending compaction, schedule to generate one.
- // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
- val instantTime = HoodieActiveTimeline.createNewInstantTime()
- if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
- Seq(instantTime)
+
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
+ var willCompactionInstants: Seq[String] = Seq.empty
+ operation match {
+ case "schedule" =>
+ val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
+ if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+ willCompactionInstants = Seq(instantTime)
+ }
+ case "run" =>
+ // Do compaction
+ val timeLine = metaClient.getActiveTimeline
+ val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
+ .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+ .map(_.getTimestamp)
+ .toSeq.sortBy(f => f)
+ willCompactionInstants = if (instantTimestamp.isEmpty) {
+ if (pendingCompactionInstants.nonEmpty) {
+ pendingCompactionInstants
+ } else { // If there are no pending compaction, schedule to generate one.
+ // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
+ val instantTime = HoodieActiveTimeline.createNewInstantTime()
+ if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+ Seq(instantTime)
+ } else {
+ Seq.empty
+ }
+ }
+ } else {
+ // Check if the compaction timestamp has exists in the pending compaction
+ if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
+ Seq(instantTimestamp.get.toString)
} else {
- Seq.empty
+ throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
+ s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}
- } else {
- // Check if the compaction timestamp has exists in the pending compaction
- if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
- Seq(instantTimestamp.get.toString)
+
+ if (willCompactionInstants.isEmpty) {
+ logInfo(s"No need to compaction on $basePath")
} else {
- throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
- s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
- }
- }
-
- if (willCompactionInstants.isEmpty) {
- logInfo(s"No need to compaction on $basePath")
- } else {
- logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
- val timer = HoodieTimer.start
- willCompactionInstants.foreach { compactionInstant =>
- val writeResponse = client.compact(compactionInstant)
- handleResponse(writeResponse.getCommitMetadata.get())
- client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+ logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
+ val timer = HoodieTimer.start
+ willCompactionInstants.foreach { compactionInstant =>
+ val writeResponse = client.compact(compactionInstant)
+ handleResponse(writeResponse.getCommitMetadata.get())
+ client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+ }
+ logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
+ s" spend: ${timer.endTimer()}ms")
}
- logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
- s" spend: ${timer.endTimer()}ms")
- }
- case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
- }
-
- val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
- .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
- .toSeq
- .sortBy(p => p.getTimestamp)
- .reverse
-
- compactionInstants.map(instant =>
- (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
- ).map { case (instant, plan) =>
- Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+ case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
+ }
+
+ val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
+ .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
+ .toSeq
+ .sortBy(p => p.getTimestamp)
+ .reverse
+
+ compactionInstants.map(instant =>
+ (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
+ ).map { case (instant, plan) =>
+ Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+ }
+ } finally {
+ if (client != null) {
+ client.close()
+ }
}
}