You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/14 11:14:56 UTC

[hudi] branch master updated: [HUDI-5773] Support archive command for spark sql (#7931)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b97369b68 [HUDI-5773] Support archive command for spark sql (#7931)
87b97369b68 is described below

commit 87b97369b68913ee312db12d373be72f40c93135
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Tue Feb 14 19:14:48 2023 +0800

    [HUDI-5773] Support archive command for spark sql (#7931)
---
 .../org/apache/hudi/cli/ArchiveExecutorUtils.java  | 69 +++++++++++++++++++
 .../procedures/ArchiveCommitsProcedure.scala       | 79 ++++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../procedure/TestArchiveCommitsProcedure.scala    | 71 +++++++++++++++++++
 4 files changed, 220 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
new file mode 100644
index 00000000000..0a23c811a01
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Archive Utils.
+ */
+public final class ArchiveExecutorUtils {
+  private static final Logger LOG = LogManager.getLogger(ArchiveExecutorUtils.class);
+
+  private ArchiveExecutorUtils() {
+  }
+
+  public static int archive(JavaSparkContext jsc,
+       int minCommits,
+       int maxCommits,
+       int commitsRetained,
+       boolean enableMetadata,
+       String basePath) {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits, maxCommits).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
+        .build();
+    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
+    HoodieSparkTable<HoodieAvroPayload> table = HoodieSparkTable.create(config, context);
+    try {
+      HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
+      archiver.archiveIfRequired(context, true);
+    } catch (IOException ioe) {
+      LOG.error("Failed to archive with IOException: " + ioe);
+      return -1;
+    }
+    return 0;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
new file mode 100644
index 00000000000..b097c942ad2
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.cli.ArchiveExecutorUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.util.function.Supplier
+
+class ArchiveCommitsProcedure extends BaseProcedure
+  with ProcedureBuilder
+  with SparkAdapterSupport
+  with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType, 20),
+    ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType, 30),
+    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10),
+    ProcedureParameter.optional(5, "enable_metadata", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.IntegerType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    val minCommits = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val maxCommits = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Int]
+    val retainCommits = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Int]
+    val enableMetadata = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Boolean]
+
+    val basePath = getBasePath(tableName, tablePath)
+
+    Seq(Row(ArchiveExecutorUtils.archive(jsc,
+      minCommits,
+      maxCommits,
+      retainCommits,
+      enableMetadata,
+      basePath)))
+  }
+
+  override def build = new ArchiveCommitsProcedure()
+}
+
+object ArchiveCommitsProcedure {
+  val NAME = "archive_commits"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ArchiveCommitsProcedure()
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index a53fec33fe9..d54c9811925 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -88,6 +88,7 @@ object HoodieProcedures {
       ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder)
       ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder)
       ,(HelpProcedure.NAME, HelpProcedure.builder)
+      ,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
new file mode 100644
index 00000000000..241f9192607
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestArchiveCommitsProcedure extends HoodieSparkProcedureTestBase {
+
+  test("Test Call archive_commits Procedure by Table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | price double,
+           | ts long
+           | ) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 30, 3000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 40, 4000)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 50, 5000)")
+      spark.sql(s"insert into $tableName values(6, 'a6', 60, 6000)")
+
+      val result1 = spark.sql(s"call archive_commits(table => '$tableName'" +
+        s", min_commits => 2, max_commits => 3, retain_commits => 1)")
+        .collect()
+        .map(row => Seq(row.getInt(0)))
+      assertResult(1)(result1.length)
+      assertResult(0)(result1(0).head)
+
+      // collect active commits for table
+      val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(2) {
+        commits.length
+      }
+
+      // collect archived commits for table
+      val endTs = commits(0).get(0).toString
+      val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
+      assertResult(4) {
+        archivedCommits.length
+      }
+    }
+  }
+}