You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by me...@apache.org on 2022/06/19 10:48:31 UTC

[hudi] branch master updated: [HUDI-3507] Support export command based on Call Produce Command (#5901)

This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c4cfec91 [HUDI-3507] Support export command based on Call Produce Command (#5901)
c5c4cfec91 is described below

commit c5c4cfec915fad198ccb57d00bc8d875a78e794b
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Sun Jun 19 18:48:22 2022 +0800

    [HUDI-3507] Support export command based on Call Produce Command (#5901)
---
 .../apache/hudi/cli/commands/ExportCommand.java    |  36 ++--
 .../procedures/ExportInstantsProcedure.scala       | 239 +++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedure/TestExportInstantsProcedure.scala    |  52 +++++
 4 files changed, 309 insertions(+), 19 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index fa6e15b7af..95e7caa8a9 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -18,6 +18,12 @@
 
 package org.apache.hudi.cli.commands;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -36,14 +42,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
 import org.springframework.shell.core.annotation.CliOption;
@@ -60,10 +58,10 @@ import java.util.stream.Collectors;
 
 /**
  * CLI commands to export various information from a HUDI dataset.
- *
+ * <p>
  * "export instants": Export Instants and their metadata from the Timeline to a local
- *                    directory specified by the parameter --localFolder
- *      The instants are exported in the json format.
+ * directory specified by the parameter --localFolder
+ * The instants are exported in the json format.
  */
 @Component
 public class ExportCommand implements CommandMarker {
@@ -83,7 +81,7 @@ public class ExportCommand implements CommandMarker {
     int numExports = limit == -1 ? Integer.MAX_VALUE : limit;
     int numCopied = 0;
 
-    if (! new File(localFolder).isDirectory()) {
+    if (!new File(localFolder).isDirectory()) {
       throw new HoodieException(localFolder + " is not a valid local directory");
     }
 
@@ -94,7 +92,7 @@ public class ExportCommand implements CommandMarker {
 
     // Archived instants are in the commit archive files
     FileStatus[] statuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
-    List<FileStatus> archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int)(f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList());
+    List<FileStatus> archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int) (f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList());
 
     if (descending) {
       Collections.reverse(nonArchivedInstants);
@@ -115,11 +113,11 @@ public class ExportCommand implements CommandMarker {
 
   private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSet, int limit, String localFolder) throws Exception {
     int copyCount = 0;
+    FileSystem fileSystem = FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf);
 
     for (FileStatus fs : statuses) {
       // read the archived file
-      Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf),
-          new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
+      Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
 
       // read the avro blocks
       while (reader.hasNext() && copyCount < limit) {
@@ -130,7 +128,7 @@ public class ExportCommand implements CommandMarker {
             // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
             // metadata record from the entry and convert it to json.
             HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
-                    .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
+                .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
             final String action = archiveEntryRecord.get("actionType").toString();
             if (!actionSet.contains(action)) {
               continue;
@@ -157,7 +155,7 @@ public class ExportCommand implements CommandMarker {
               default:
                 throw new HoodieException("Unknown type of action " + action);
             }
-            
+
             final String instantTime = archiveEntryRecord.get("commitTime").toString();
             final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
             writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
@@ -178,9 +176,8 @@ public class ExportCommand implements CommandMarker {
     int copyCount = 0;
 
     if (instants.isEmpty()) {
-      return limit;
+      return copyCount;
     }
-    final Logger LOG = LogManager.getLogger(ExportCommand.class);
 
     final HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
     final HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
@@ -221,6 +218,7 @@ public class ExportCommand implements CommandMarker {
 
       if (data != null) {
         writeToFile(localPath, data);
+        copyCount = copyCount + 1;
       }
     }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
new file mode 100644
index 0000000000..ff6ab92179
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.specific.SpecificData
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.log.HoodieLogFormat
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineMetadataUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.io.File
+import java.util
+import java.util.Collections
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+import scala.util.control.Breaks.break
+
+class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
+  var sortByFieldParameter: ProcedureParameter = _
+
+  val defaultActions = "clean,commit,deltacommit,rollback,savepoint,restore"
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
+    ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions),
+    ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("export_detail", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val localFolder = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val actions: String = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
+    val desc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
+
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
+    val basePath = hoodieCatalogTable.tableLocation
+    val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val archivePath = new Path(basePath + "/.hoodie/.commits_.archive*")
+    val actionSet: util.Set[String] = Set(actions.split(","): _*).asJava
+    val numExports = if (limit == -1) Integer.MAX_VALUE else limit
+    var numCopied = 0
+
+    if (!new File(localFolder).isDirectory) throw new HoodieException(localFolder + " is not a valid local directory")
+
+    // The non archived instants can be listed from the Timeline.
+    val nonArchivedInstants: util.List[HoodieInstant] = metaClient
+      .getActiveTimeline
+      .filterCompletedInstants.getInstants.iterator().asScala
+      .filter((i: HoodieInstant) => actionSet.contains(i.getAction))
+      .toList.asJava
+
+    // Archived instants are in the commit archive files
+    val statuses: Array[FileStatus] = FSUtils.getFs(basePath, jsc.hadoopConfiguration()).globStatus(archivePath)
+    val archivedStatuses = List(statuses: _*)
+      .sortWith((f1, f2) => (f1.getModificationTime - f2.getModificationTime).toInt > 0).asJava
+
+    if (desc) {
+      Collections.reverse(nonArchivedInstants)
+      numCopied = copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports, localFolder)
+      if (numCopied < numExports) {
+        Collections.reverse(archivedStatuses)
+        numCopied += copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports - numCopied, localFolder)
+      }
+    } else {
+      numCopied = copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports, localFolder)
+      if (numCopied < numExports) numCopied += copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports - numCopied, localFolder)
+    }
+
+    Seq(Row("Exported " + numCopied + " Instants to " + localFolder))
+  }
+
+  @throws[Exception]
+  private def copyArchivedInstants(basePath: String, statuses: util.List[FileStatus], actionSet: util.Set[String], limit: Int, localFolder: String) = {
+    import scala.collection.JavaConversions._
+    var copyCount = 0
+    val fileSystem = FSUtils.getFs(basePath, jsc.hadoopConfiguration())
+    for (fs <- statuses) {
+      // read the archived file
+      val reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath), HoodieArchivedMetaEntry.getClassSchema)
+      // read the avro blocks
+      while ( {
+        reader.hasNext && copyCount < limit
+      }) {
+        val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
+        try {
+          val recordItr = blk.getRecordIterator
+          try while ( {
+            recordItr.hasNext
+          }) {
+            val ir = recordItr.next
+            // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
+            // metadata record from the entry and convert it to json.
+            val archiveEntryRecord = SpecificData.get.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir).asInstanceOf[HoodieArchivedMetaEntry]
+            val action = archiveEntryRecord.get("actionType").toString
+            if (!actionSet.contains(action)) break() //todo: continue is not supported
+            val metadata: GenericRecord = action match {
+              case HoodieTimeline.CLEAN_ACTION =>
+                archiveEntryRecord.getHoodieCleanMetadata
+
+              case HoodieTimeline.COMMIT_ACTION =>
+                archiveEntryRecord.getHoodieCommitMetadata
+
+              case HoodieTimeline.DELTA_COMMIT_ACTION =>
+                archiveEntryRecord.getHoodieCommitMetadata
+
+              case HoodieTimeline.ROLLBACK_ACTION =>
+                archiveEntryRecord.getHoodieRollbackMetadata
+
+              case HoodieTimeline.SAVEPOINT_ACTION =>
+                archiveEntryRecord.getHoodieSavePointMetadata
+
+              case HoodieTimeline.COMPACTION_ACTION =>
+                archiveEntryRecord.getHoodieCompactionMetadata
+
+              case _ => logInfo("Unknown type of action " + action)
+                null
+            }
+            val instantTime = archiveEntryRecord.get("commitTime").toString
+            val outPath = localFolder + Path.SEPARATOR + instantTime + "." + action
+            if (metadata != null) writeToFile(fileSystem, outPath, HoodieAvroUtils.avroToJson(metadata, true))
+            if ( {
+              copyCount += 1;
+              copyCount
+            } == limit) break //todo: break is not supported
+          }
+          finally if (recordItr != null) recordItr.close()
+        }
+      }
+      reader.close()
+    }
+    copyCount
+  }
+
+  @throws[Exception]
+  private def copyNonArchivedInstants(metaClient: HoodieTableMetaClient, instants: util.List[HoodieInstant], limit: Int, localFolder: String): Int = {
+    import scala.collection.JavaConversions._
+    var copyCount = 0
+    if (instants.nonEmpty) {
+      val timeline = metaClient.getActiveTimeline
+      val fileSystem = FSUtils.getFs(metaClient.getBasePath, jsc.hadoopConfiguration())
+      for (instant <- instants) {
+        val localPath = localFolder + Path.SEPARATOR + instant.getFileName
+        val data: Array[Byte] = instant.getAction match {
+          case HoodieTimeline.CLEAN_ACTION =>
+            val metadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(instant).get)
+            HoodieAvroUtils.avroToJson(metadata, true)
+
+          case HoodieTimeline.DELTA_COMMIT_ACTION =>
+            // Already in json format
+            timeline.getInstantDetails(instant).get
+
+          case HoodieTimeline.COMMIT_ACTION =>
+            // Already in json format
+            timeline.getInstantDetails(instant).get
+
+          case HoodieTimeline.COMPACTION_ACTION =>
+            // Already in json format
+            timeline.getInstantDetails(instant).get
+
+          case HoodieTimeline.ROLLBACK_ACTION =>
+            val metadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get)
+            HoodieAvroUtils.avroToJson(metadata, true)
+
+          case HoodieTimeline.SAVEPOINT_ACTION =>
+            val metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(instant).get)
+            HoodieAvroUtils.avroToJson(metadata, true)
+
+          case _ => null
+
+        }
+        if (data != null) {
+          writeToFile(fileSystem, localPath, data)
+          copyCount = copyCount + 1
+        }
+      }
+    }
+    copyCount
+  }
+
+  @throws[Exception]
+  private def writeToFile(fs: FileSystem, path: String, data: Array[Byte]): Unit = {
+    val out = fs.create(new Path(path))
+    out.write(data)
+    out.flush()
+    out.close()
+  }
+
+  override def build = new ExportInstantsProcedure()
+}
+
+object ExportInstantsProcedure {
+  val NAME = "export_instants"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ExportInstantsProcedure()
+  }
+}
+
+
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 7cfeaaa0b6..ff129964fa 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -47,6 +47,7 @@ object HoodieProcedures {
     mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
     mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
     mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
+    mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
     mapBuilder.build
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala
new file mode 100644
index 0000000000..b234126930
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestExportInstantsProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call export_instants Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+
+      val result = spark.sql(s"""call export_instants(table => '$tableName', localFolder => '${tmp.getCanonicalPath}/$tableName')""").limit(1).collect()
+      assertResult(1) {
+        result.length
+      }
+    }
+  }
+}