You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/29 10:47:42 UTC

[incubator-hudi] branch master updated: [HUDI-702] Add test for HoodieLogFileCommand (#1522)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9059bce  [HUDI-702] Add test for HoodieLogFileCommand (#1522)
9059bce is described below

commit 9059bce977cee98e2d65769622c46a1941c563dd
Author: hongdd <jn...@163.com>
AuthorDate: Wed Apr 29 18:47:27 2020 +0800

    [HUDI-702] Add test for HoodieLogFileCommand (#1522)
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  12 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  19 +-
 .../cli/commands/TestHoodieLogFileCommand.java     | 222 +++++++++++++++++++++
 3 files changed, 246 insertions(+), 7 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 9256493..3b398e3 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -26,16 +26,26 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path";
   public static final String HEADER_FILE_ID = "FileId";
   public static final String HEADER_BASE_INSTANT = "Base-Instant";
-
+  public static final String HEADER_INSTANT_TIME = "InstantTime";
   public static final String HEADER_CLEAN_TIME = "CleanTime";
   public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained";
   public static final String HEADER_CLEANING_POLICY = "Cleaning policy";
+
   public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted";
   public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted";
   public static final String HEADER_TOTAL_FAILED_DELETIONS = "Total Failed Deletions";
   public static final String HEADER_TOTAL_TIME_TAKEN = "Total Time Taken";
 
   /**
+   * Fields of log file.
+   */
+  public static final String HEADER_RECORDS = "Records";
+  public static final String HEADER_RECORD_COUNT = "RecordCount";
+  public static final String HEADER_BLOCK_TYPE = "BlockType";
+  public static final String HEADER_HEADER_METADATA = "HeaderMetadata";
+  public static final String HEADER_FOOTER_METADATA = "FooterMetadata";
+
+  /**
    * Fields of data header.
    */
   public static final String HEADER_DATA_FILE = "Data-File";
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index db1ab16..f7c4d65 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
 
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -53,6 +54,7 @@ import org.springframework.stereotype.Component;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -134,7 +136,6 @@ public class HoodieLogFileCommand implements CommandMarker {
       reader.close();
     }
     List<Comparable[]> rows = new ArrayList<>();
-    int i = 0;
     ObjectMapper objectMapper = new ObjectMapper();
     for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
         .entrySet()) {
@@ -148,12 +149,14 @@ public class HoodieLogFileCommand implements CommandMarker {
         output[3] = objectMapper.writeValueAsString(tuple3._2()._1());
         output[4] = objectMapper.writeValueAsString(tuple3._2()._2());
         rows.add(output);
-        i++;
       }
     }
 
-    TableHeader header = new TableHeader().addTableHeaderField("InstantTime").addTableHeaderField("RecordCount")
-        .addTableHeaderField("BlockType").addTableHeaderField("HeaderMetadata").addTableHeaderField("FooterMetadata");
+    TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA);
 
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
@@ -173,7 +176,11 @@ public class HoodieLogFileCommand implements CommandMarker {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     FileSystem fs = client.getFs();
     List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
-        .map(status -> status.getPath().toString()).collect(Collectors.toList());
+        .map(status -> status.getPath().toString()).sorted(Comparator.reverseOrder())
+        .collect(Collectors.toList());
+
+    // logFilePaths size must > 1
+    assert logFilePaths.size() > 0 : "There is no log file";
 
     // TODO : readerSchema can change across blocks/log files, fix this inside Scanner
     AvroSchemaConverter converter = new AvroSchemaConverter();
@@ -232,6 +239,6 @@ public class HoodieLogFileCommand implements CommandMarker {
       rows[i] = data;
       i++;
     }
-    return HoodiePrintHelper.print(new String[] {"Records"}, rows);
+    return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
   }
 }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
new file mode 100644
index 0000000..b0d2504
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMemoryConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Cases for {@link HoodieLogFileCommand}.
+ */
+public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
+
+  private String partitionPath;
+  private HoodieAvroDataBlock dataBlock;
+  private String tablePath;
+
+  private static final String INSTANT_TIME = "100";
+
+  @BeforeEach
+  public void init() throws IOException, InterruptedException, URISyntaxException {
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+
+    // Create table and connect
+    String tableName = "test_table";
+    tablePath = basePath + File.separator + tableName;
+    partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+
+    Files.createDirectories(Paths.get(partitionPath));
+
+    HoodieLogFormat.Writer writer = null;
+    try {
+      writer =
+          HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath))
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+              .withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build();
+
+      // write data to file
+      List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+      dataBlock = new HoodieAvroDataBlock(records, header);
+      writer = writer.appendBlock(dataBlock);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+  }
+
+  /**
+   * Test case for 'show logfile metadata'.
+   */
+  @Test
+  public void testShowLogFileCommits() throws JsonProcessingException {
+    CommandResult cr = getShell().executeCommand("show logfile metadata --logFilePathPattern " + partitionPath + "/*");
+    assertTrue(cr.isSuccess());
+
+    TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA);
+
+    // construct expect result, there is only 1 line.
+    List<Comparable[]> rows = new ArrayList<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    String headerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockHeader());
+    String footerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockFooter());
+    Comparable[] output = new Comparable[]{INSTANT_TIME, 100, dataBlock.getBlockType(), headerStr, footerStr};
+    rows.add(output);
+
+    String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'show logfile records'.
+   */
+  @Test
+  public void testShowLogFileRecords() throws IOException, URISyntaxException {
+    CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern " + partitionPath + "/*");
+    assertTrue(cr.isSuccess());
+
+    // construct expect result, get 10 records.
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 10);
+    String[][] rows = records.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new);
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for 'show logfile records' with merge.
+   */
+  @Test
+  public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedException, URISyntaxException {
+    // create commit instant
+    HoodieTestCommitMetadataGenerator.createCommitFile(tablePath, INSTANT_TIME, HoodieCLI.conf);
+
+    // write to path '2015/03/16'.
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+    Files.createDirectories(Paths.get(partitionPath));
+
+    HoodieLogFormat.Writer writer = null;
+    try {
+      // set little threshold to split file.
+      writer =
+          HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath))
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+              .withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
+
+      List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+      writer = writer.appendBlock(dataBlock);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+
+    CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern "
+        + partitionPath + "/* --mergeRecords true");
+    assertTrue(cr.isSuccess());
+
+    // get expected result of 10 records.
+    List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
+        .map(status -> status.getPath().toString()).collect(Collectors.toList());
+    HoodieMergedLogRecordScanner scanner =
+        new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME,
+            HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
+            Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
+            Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
+            HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
+            HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
+    int num = 0;
+    int maxSize = 10;
+    List<IndexedRecord> indexRecords = new ArrayList<>();
+    while (records.hasNext() && num < maxSize) {
+      Option<IndexedRecord> hoodieRecord = records.next().getData().getInsertValue(schema);
+      indexRecords.add(hoodieRecord.get());
+      num++;
+    }
+    String[][] rows = indexRecords.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new);
+    assertNotNull(rows);
+
+    String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows);
+
+    assertEquals(expected, cr.getResult().toString());
+  }
+}