You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/08/28 19:24:07 UTC

[hudi] branch master updated: [HUDI-1228] Add utility method to query extra metadata

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dbeabf  [HUDI-1228] Add utility method to query extra metadata
4dbeabf is described below

commit 4dbeabffa3b2b9f30c48f2deae3ea4f91570afdd
Author: Satish Kotha <sa...@uber.com>
AuthorDate: Tue Aug 25 21:48:28 2020 -0700

    [HUDI-1228] Add utility method to query extra metadata
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 30 ++++++++++++++
 .../hudi/common/table/TestTimelineUtils.java       | 46 ++++++++++++++++++++--
 2 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index fd30ee3..95a2ae6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -21,11 +21,14 @@ package org.apache.hudi.common.table.timeline;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -98,4 +101,31 @@ public class TimelineUtils {
 
     }).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList());
   }
+
+  /**
+   * Get extra metadata for specified key from latest commit/deltacommit instant.
+   */
+  public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
+    return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant ->
+        getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
+  }
+
+  /**
+   * Get extra metadata for specified key from all active commit/deltacommit instants.
+   */
+  public static Map<String, Option<String>> getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String extraMetadataKey) {
+    return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap(
+          HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant)));
+  }
+
+  private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
+    try {
+      HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+          metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+
+      return Option.ofNullable(commitMetadata.getExtraMetadata().get(extraMetadataKey));
+    } catch (IOException e) {
+      throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 1a1ac54..2ea418c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestTimelineUtils extends HoodieCommonTestHarness {
@@ -68,7 +69,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
       String ts = i + "";
       HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
-      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2)));
+      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
       HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
       activeTimeline.createNewInstant(cleanInstant);
@@ -107,7 +108,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
       String ts = i + "";
       HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
-      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2)));
+      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap())));
 
       HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
       activeTimeline.createNewInstant(cleanInstant);
@@ -147,6 +148,42 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
   }
 
+  @Test
+  public void testGetExtraMetadata() throws Exception {
+    String extraMetadataKey = "test_key";
+    String extraMetadataValue1 = "test_value1";
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    assertTrue(activeCommitTimeline.empty());
+    assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent());
+
+    String ts = "0";
+    HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+    activeTimeline.createNewInstant(instant);
+    activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
+
+    ts =  "1";
+    instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+    activeTimeline.createNewInstant(instant);
+    Map<String, String> extraMetadata = new HashMap<>();
+    extraMetadata.put(extraMetadataKey, extraMetadataValue1);
+    activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, extraMetadata)));
+
+    metaClient.reloadActiveTimeline();
+
+    // verify modified partitions included cleaned data
+    Option<String> extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
+    assertTrue(extraLatestValue.isPresent());
+    assertEquals(extraMetadataValue1, extraLatestValue.get());
+    assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());
+
+    Map<String, Option<String>> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
+    assertEquals(2, extraMetadataEntries.size());
+    assertFalse(extraMetadataEntries.get("0").isPresent());
+    assertTrue(extraMetadataEntries.get("1").isPresent());
+    assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
+  }
+
   private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException {
     HoodieRestoreMetadata metadata = new HoodieRestoreMetadata();
     List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
@@ -173,7 +210,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
     return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
   }
 
-  private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count)
+  private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map<String, String> extraMetadata)
       throws IOException {
     HoodieCommitMetadata commit = new HoodieCommitMetadata();
     for (int i = 1; i <= count; i++) {
@@ -183,6 +220,9 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
       stat.setPath(commitTs + "." + i + ".parquet");
       commit.addWriteStat(partition, stat);
     }
+    for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
+      commit.addMetadata(extraEntries.getKey(), extraEntries.getValue());
+    }
     return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
   }