You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2023/02/01 05:53:31 UTC

[hudi] branch master updated: [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 255d40c5fb5 [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
255d40c5fb5 is described below

commit 255d40c5fb5899b7fc05290b8d1c3fcdb24b4ffe
Author: StreamingFlames <18...@163.com>
AuthorDate: Wed Feb 1 13:53:19 2023 +0800

    [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655)
---
 .../command/procedures/DeleteMarkerProcedure.scala |   8 +-
 .../RollbackToInstantTimeProcedure.scala           |  50 ++++----
 .../command/procedures/RunCleanProcedure.scala     |  27 +++--
 .../procedures/RunCompactionProcedure.scala        | 126 +++++++++++----------
 4 files changed, 121 insertions(+), 90 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index bfbab32599b..d99a5489799 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.table.marker.WriteMarkersFactory
 import org.apache.spark.internal.Logging
@@ -47,8 +48,9 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
     val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
     val basePath = getBasePath(tableName)
 
+    var client: SparkRDDWriteClient[_] = null
     val result = Try {
-      val client = createHoodieClient(jsc, basePath)
+      client = createHoodieClient(jsc, basePath)
       val config = client.getConfig
       val context = client.getEngineContext
       val table = HoodieSparkTable.create(config, context)
@@ -63,6 +65,10 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
         false
     }
 
+    if (client != null) {
+      client.close()
+    }
+
     Seq(Row(result))
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index 1fcc665d611..c8109bd56e2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -52,28 +53,35 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
 
     val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
     val basePath = hoodieCatalogTable.tableLocation
-    val client = createHoodieClient(jsc, basePath)
-    client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
-    val config = getWriteConfig(basePath)
-    val metaClient = HoodieTableMetaClient.builder
-      .setConf(jsc.hadoopConfiguration)
-      .setBasePath(config.getBasePath)
-      .setLoadActiveTimelineOnLoad(false)
-      .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
-      .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
-      .build
-
-    val activeTimeline = metaClient.getActiveTimeline
-    val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
-    val filteredTimeline = completedTimeline.containsInstant(instantTime)
-    if (!filteredTimeline) {
-      throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = createHoodieClient(jsc, basePath)
+      client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
+      val config = getWriteConfig(basePath)
+      val metaClient = HoodieTableMetaClient.builder
+        .setConf(jsc.hadoopConfiguration)
+        .setBasePath(config.getBasePath)
+        .setLoadActiveTimelineOnLoad(false)
+        .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
+        .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
+        .build
+
+      val activeTimeline = metaClient.getActiveTimeline
+      val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
+      val filteredTimeline = completedTimeline.containsInstant(instantTime)
+      if (!filteredTimeline) {
+        throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
+      }
+
+      val result = if (client.rollback(instantTime)) true else false
+      val outputRow = Row(result)
+
+      Seq(outputRow)
+    } finally {
+      if (client != null) {
+        client.close()
+      }
     }
-
-    val result = if (client.rollback(instantTime)) true else false
-    val outputRow = Row(result)
-
-    Seq(outputRow)
   }
 
   override def build: Procedure = new RollbackToInstantTimeProcedure()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 36580176d0f..ca8b3fc95bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.config.HoodieCleanConfig
@@ -79,16 +80,24 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
       HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
       HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
     )
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
-    val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
 
-    if (hoodieCleanMeta == null) Seq.empty
-    else Seq(Row(hoodieCleanMeta.getStartCleanTime,
-      hoodieCleanMeta.getTimeTakenInMillis,
-      hoodieCleanMeta.getTotalFilesDeleted,
-      hoodieCleanMeta.getEarliestCommitToRetain,
-      JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
-      hoodieCleanMeta.getVersion))
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
+      val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
+
+      if (hoodieCleanMeta == null) Seq.empty
+      else Seq(Row(hoodieCleanMeta.getStartCleanTime,
+        hoodieCleanMeta.getTimeTakenInMillis,
+        hoodieCleanMeta.getTotalFilesDeleted,
+        hoodieCleanMeta.getEarliestCommitToRetain,
+        JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
+        hoodieCleanMeta.getVersion))
+    } finally {
+      if (client != null) {
+        client.close()
+      }
+    }
   }
 }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 3c51d7d8b29..d79cf8c302f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
@@ -64,70 +65,77 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
 
     val basePath = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
-
-    var willCompactionInstants: Seq[String] = Seq.empty
-    operation match {
-      case "schedule" =>
-        val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
-        if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
-          willCompactionInstants = Seq(instantTime)
-        }
-      case "run" =>
-        // Do compaction
-        val timeLine = metaClient.getActiveTimeline
-        val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
-          .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
-          .map(_.getTimestamp)
-          .toSeq.sortBy(f => f)
-        willCompactionInstants = if (instantTimestamp.isEmpty) {
-          if (pendingCompactionInstants.nonEmpty) {
-            pendingCompactionInstants
-          } else { // If there are no pending compaction, schedule to generate one.
-            // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
-            val instantTime = HoodieActiveTimeline.createNewInstantTime()
-            if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
-              Seq(instantTime)
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
+      var willCompactionInstants: Seq[String] = Seq.empty
+      operation match {
+        case "schedule" =>
+          val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
+          if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+            willCompactionInstants = Seq(instantTime)
+          }
+        case "run" =>
+          // Do compaction
+          val timeLine = metaClient.getActiveTimeline
+          val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
+            .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+            .map(_.getTimestamp)
+            .toSeq.sortBy(f => f)
+          willCompactionInstants = if (instantTimestamp.isEmpty) {
+            if (pendingCompactionInstants.nonEmpty) {
+              pendingCompactionInstants
+            } else { // If there are no pending compaction, schedule to generate one.
+              // CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
+              val instantTime = HoodieActiveTimeline.createNewInstantTime()
+              if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
+                Seq(instantTime)
+              } else {
+                Seq.empty
+              }
+            }
+          } else {
+            // Check if the compaction timestamp has exists in the pending compaction
+            if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
+              Seq(instantTimestamp.get.toString)
             } else {
-              Seq.empty
+              throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
+                s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
             }
           }
-        } else {
-          // Check if the compaction timestamp has exists in the pending compaction
-          if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
-            Seq(instantTimestamp.get.toString)
+
+          if (willCompactionInstants.isEmpty) {
+            logInfo(s"No need to compaction on $basePath")
           } else {
-            throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
-              s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
-          }
-        }
-
-        if (willCompactionInstants.isEmpty) {
-          logInfo(s"No need to compaction on $basePath")
-        } else {
-          logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
-          val timer = HoodieTimer.start
-          willCompactionInstants.foreach { compactionInstant =>
-            val writeResponse = client.compact(compactionInstant)
-            handleResponse(writeResponse.getCommitMetadata.get())
-            client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+            logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
+            val timer = HoodieTimer.start
+            willCompactionInstants.foreach { compactionInstant =>
+              val writeResponse = client.compact(compactionInstant)
+              handleResponse(writeResponse.getCommitMetadata.get())
+              client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
+            }
+            logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
+              s" spend: ${timer.endTimer()}ms")
           }
-          logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
-            s" spend: ${timer.endTimer()}ms")
-        }
-      case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
-    }
-
-    val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
-      .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
-      .toSeq
-      .sortBy(p => p.getTimestamp)
-      .reverse
-
-    compactionInstants.map(instant =>
-      (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
-    ).map { case (instant, plan) =>
-      Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+        case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
+      }
+
+      val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
+        .filter(instant => willCompactionInstants.contains(instant.getTimestamp))
+        .toSeq
+        .sortBy(p => p.getTimestamp)
+        .reverse
+
+      compactionInstants.map(instant =>
+        (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
+      ).map { case (instant, plan) =>
+        Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
+      }
+    } finally {
+      if (client != null) {
+        client.close()
+      }
     }
   }