You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/06/07 23:04:50 UTC

[hudi] branch master updated: [HUDI-1914] Add fetching latest schema to table command in hudi-cli (#2964)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9195909  [HUDI-1914] Add fetching latest schema to table command in hudi-cli (#2964)
9195909 is described below

commit 919590988a4db1b73a67188d8b1b0d971f8b766d
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jun 7 19:04:35 2021 -0400

    [HUDI-1914] Add fetching latest schema to table command in hudi-cli (#2964)
---
 .../org/apache/hudi/cli/commands/TableCommand.java | 46 ++++++++++++
 .../apache/hudi/cli/commands/TestTableCommand.java | 84 ++++++++++++++++++++++
 .../HoodieTestCommitMetadataGenerator.java         | 32 +++++++--
 3 files changed, 158 insertions(+), 4 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index d25e0c8..d1fd694 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -23,14 +23,21 @@ import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.exception.TableNotFoundException;
 
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
 import org.springframework.shell.core.annotation.CliOption;
 import org.springframework.stereotype.Component;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +48,8 @@ import java.util.List;
 @Component
 public class TableCommand implements CommandMarker {
 
+  private static final Logger LOG = LogManager.getLogger(TableCommand.class);
+
   static {
     System.out.println("Table command getting loaded");
   }
@@ -142,4 +151,41 @@ public class TableCommand implements CommandMarker {
     HoodieCLI.refreshTableMetadata();
     return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
   }
+
+  /**
+   * Fetches table schema in avro format.
+   */
+  @CliCommand(value = "fetch table schema", help = "Fetches latest table schema")
+  public String fetchTableSchema(
+      @CliOption(key = {"outputFilePath"}, mandatory = false, help = "File path to write schema") final String outputFilePath) throws Exception {
+    HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+    TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(client);
+    Schema schema = tableSchemaResolver.getTableAvroSchema();
+    if (outputFilePath != null) {
+      LOG.info("Latest table schema : " + schema.toString(true));
+      writeToFile(outputFilePath, schema.toString(true));
+      return String.format("Latest table schema written to %s", outputFilePath);
+    } else {
+      return String.format("Latest table schema %s", schema.toString(true));
+    }
+  }
+
+  /**
+   * Use Streams when you are dealing with raw data.
+   * @param filePath output file path.
+   * @param data to be written to file.
+   */
+  private static void writeToFile(String filePath, String data) throws IOException {
+    File outFile = new File(filePath);
+    if (outFile.exists()) {
+      outFile.delete();
+    }
+    OutputStream os = null;
+    try {
+      os = new FileOutputStream(outFile);
+      os.write(data.getBytes(), 0, data.length());
+    } finally {
+      os.close();
+    }
+  }
 }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
index cdf9db3..fe3407f 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
@@ -18,14 +18,20 @@
 
 package org.apache.hudi.cli.commands;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
@@ -33,9 +39,13 @@ import org.junit.jupiter.api.Test;
 import org.springframework.shell.core.CommandResult;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -192,4 +202,78 @@ public class TestTableCommand extends AbstractShellIntegrationTest {
     // After refresh, there are 4 instants
     assertEquals(4, timeline.countInstants(), "there should have 4 instants");
   }
+
+  @Test
+  public void testFetchTableSchema() throws Exception {
+    // Create table and connect
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+    metaClient = HoodieCLI.getTableMetaClient();
+
+    String schemaStr = "{\n"
+        + "         \"type\" : \"record\",\n"
+        + "         \"name\" : \"SchemaName\",\n"
+        + "         \"namespace\" : \"SchemaNS\",\n"
+        + "         \"fields\" : [ {\n"
+        + "           \"name\" : \"key\",\n"
+        + "           \"type\" : \"int\"\n"
+        + "         }, {\n"
+        + "           \"name\" : \"val\",\n"
+        + "           \"type\" : [ \"null\", \"string\" ],\n"
+        + "           \"default\" : null\n"
+        + "         }]};";
+
+    generateData(schemaStr);
+
+    CommandResult cr = getShell().executeCommand("fetch table schema");
+    assertTrue(cr.isSuccess());
+
+    String actualSchemaStr = cr.getResult().toString().substring(cr.getResult().toString().indexOf("{"));
+    Schema actualSchema = new Schema.Parser().parse(actualSchemaStr);
+
+    Schema expectedSchema = new Schema.Parser().parse(schemaStr);
+    expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema);
+    assertEquals(actualSchema, expectedSchema);
+
+    File file = File.createTempFile("temp", null);
+    cr = getShell().executeCommand("fetch table schema --outputFilePath " + file.getAbsolutePath());
+    assertTrue(cr.isSuccess());
+
+    actualSchemaStr = getFileContent(file.getAbsolutePath());
+    actualSchema = new Schema.Parser().parse(actualSchemaStr);
+    assertEquals(actualSchema, expectedSchema);
+  }
+
+  private LinkedHashMap<String, Integer[]> generateData(String schemaStr) throws Exception {
+    // generate data and metadata
+    LinkedHashMap<String, Integer[]> data = new LinkedHashMap<>();
+    data.put("102", new Integer[] {15, 10});
+    data.put("101", new Integer[] {20, 10});
+    data.put("100", new Integer[] {15, 15});
+    for (Map.Entry<String, Integer[]> entry : data.entrySet()) {
+      String key = entry.getKey();
+      Integer[] value = entry.getValue();
+      HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, HoodieCLI.conf,
+          Option.of(value[0]), Option.of(value[1]), Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, schemaStr));
+    }
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+    assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
+        "There should have 3 commits");
+    return data;
+  }
+
+  private String getFileContent(String fileToReadStr) throws IOException {
+    File fileToRead = new File(fileToReadStr);
+    if (!fileToRead.exists()) {
+      throw new IllegalStateException("Outfile " + fileToReadStr + "not found ");
+    }
+    FileInputStream fis = new FileInputStream(fileToRead);
+    byte[] data = new byte[(int) fileToRead.length()];
+    fis.read(data);
+    fis.close();
+    return new String(data, "UTF-8");
+  }
 }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index c33bb26..105a9f6 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,17 +69,27 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
 
   public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
       Option<Integer> writes, Option<Integer> updates) throws Exception {
+    createCommitFileWithMetadata(basePath, commitTime, configuration, writes, updates, Collections.emptyMap());
+  }
+
+  public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
+      Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetdata) throws Exception {
     createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
-        UUID.randomUUID().toString(), writes, updates);
+        UUID.randomUUID().toString(), writes, updates, extraMetdata);
   }
 
   public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
       String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates) throws Exception {
+    createCommitFileWithMetadata(basePath, commitTime, configuration, fileId1, fileId2, writes, updates, Collections.emptyMap());
+  }
+
+  public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
+      String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) throws Exception {
     List<String> commitFileNames = Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
         HoodieTimeline.makeRequestedCommitFileName(commitTime));
     for (String name : commitFileNames) {
       HoodieCommitMetadata commitMetadata =
-              generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates);
+              generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata);
       String content = commitMetadata.toJsonString();
       createFileWithMetadata(basePath, configuration, name, content);
     }
@@ -106,6 +117,11 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
 
   public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
       String fileId2, Option<Integer> writes, Option<Integer> updates) throws Exception {
+    return generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, Collections.emptyMap());
+  }
+
+  public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1,
+      String fileId2, Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) throws Exception {
     FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1);
     FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2);
     return generateCommitMetadata(new HashMap<String, List<String>>() {
@@ -113,15 +129,23 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator {
         put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1)));
         put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2)));
       }
-    }, writes, updates);
+    }, writes, updates, extraMetadata);
+  }
+
+  private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
+      Option<Integer> writes, Option<Integer> updates) {
+    return generateCommitMetadata(partitionToFilePaths, writes, updates, Collections.emptyMap());
   }
 
   /**
    * Method to generate commit metadata.
    */
   private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths,
-      Option<Integer> writes, Option<Integer> updates) {
+      Option<Integer> writes, Option<Integer> updates, Map<String, String> extraMetadata) {
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    for (Map.Entry<String, String> entry: extraMetadata.entrySet()) {
+      metadata.addMetadata(entry.getKey(), entry.getValue());
+    }
     partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
       HoodieWriteStat writeStat = new HoodieWriteStat();
       writeStat.setPartitionPath(key);