You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/08/25 19:40:26 UTC

[hudi] branch master updated: [HUDI-1191] Add incremental meta client API to query partitions modified in a time window

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 492ddcb  [HUDI-1191] Add incremental meta client API to query partitions modified in a time window
492ddcb is described below

commit 492ddcbb06e107618ae71fa369ba07af47a036fb
Author: Satish Kotha <sa...@uber.com>
AuthorDate: Thu Aug 13 17:14:25 2020 -0700

    [HUDI-1191] Add incremental meta client API to query partitions modified in a time window
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 101 ++++++++++
 .../hudi/common/table/TestTimelineUtils.java       | 213 +++++++++++++++++++++
 .../hudi/sync/common/AbstractSyncHoodieClient.java |  26 +--
 3 files changed, 319 insertions(+), 21 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
new file mode 100644
index 0000000..fd30ee3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table.
+ *
+ * This is useful in multiple places including:
+ * 1) HiveSync - this can be used to query partitions that changed since previous sync.
+ * 2) Incremental reads - InputFormats can use this API to query
+ */
+public class TimelineUtils {
+
+  /**
+   * Returns partitions that have new data strictly after commitTime.
+   * Does not include internal operations such as clean in the timeline.
+   */
+  public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
+    HoodieTimeline timelineToSync = timeline.getCommitsAndCompactionTimeline();
+    return getAffectedPartitions(timelineToSync);
+  }
+
+  /**
+   * Returns partitions that have been modified including internal operations such as clean in the passed timeline.
+   */
+  public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
+    return timeline.filterCompletedInstants().getInstants().flatMap(s -> {
+      switch (s.getAction()) {
+        case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.DELTA_COMMIT_ACTION:
+          try {
+            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
+            return commitMetadata.getPartitionToWriteStats().keySet().stream();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions written at " + s, e);
+          }
+        case HoodieTimeline.CLEAN_ACTION:
+          try {
+            HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(s).get());
+            return cleanMetadata.getPartitionMetadata().keySet().stream();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions cleaned at " + s, e);
+          }
+        case HoodieTimeline.ROLLBACK_ACTION:
+          try {
+            return TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions rolledback at " + s, e);
+          }
+        case HoodieTimeline.RESTORE_ACTION:
+          try {
+            HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(s).get(), HoodieRestoreMetadata.class);
+            return restoreMetadata.getHoodieRestoreMetadata().values().stream()
+                .flatMap(Collection::stream)
+                .flatMap(rollbackMetadata -> rollbackMetadata.getPartitionMetadata().keySet().stream());
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions restored at " + s, e);
+          }
+        case HoodieTimeline.SAVEPOINT_ACTION:
+          try {
+            return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions savepoint at " + s, e);
+          }
+        case HoodieTimeline.COMPACTION_ACTION:
+          // compaction is not a completed instant.  So no need to consider this action.
+          return Stream.empty();
+        default:
+          throw new HoodieIOException("unknown action in timeline " + s.getAction());
+      }
+
+    }).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList());
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
new file mode 100644
index 0000000..1a1ac54
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestTimelineUtils extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initMetaClient();
+  }
+
+  @Test
+  public void testGetPartitions() throws IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    assertTrue(activeCommitTimeline.empty());
+
+    String olderPartition = "0"; // older partitions that is modified by all cleans
+    for (int i = 1; i <= 5; i++) {
+      String ts = i + "";
+      HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+      activeTimeline.createNewInstant(instant);
+      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2)));
+
+      HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
+      activeTimeline.createNewInstant(cleanInstant);
+      activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts));
+    }
+
+    metaClient.reloadActiveTimeline();
+
+    // verify modified partitions included cleaned data
+    List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+    assertEquals(5, partitions.size());
+    assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"}));
+
+    partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+    assertEquals(4, partitions.size());
+    assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"}));
+
+    // verify only commit actions
+    partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+    assertEquals(4, partitions.size());
+    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+
+    partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+    assertEquals(3, partitions.size());
+    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+  }
+
+  @Test
+  public void testGetPartitionsUnpartitioned() throws IOException {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    assertTrue(activeCommitTimeline.empty());
+
+    String partitionPath = "";
+    for (int i = 1; i <= 5; i++) {
+      String ts = i + "";
+      HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+      activeTimeline.createNewInstant(instant);
+      activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2)));
+
+      HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
+      activeTimeline.createNewInstant(cleanInstant);
+      activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts));
+    }
+
+    metaClient.reloadActiveTimeline();
+
+    // verify modified partitions included cleaned data
+    List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+    assertTrue(partitions.isEmpty());
+
+    partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+    assertTrue(partitions.isEmpty());
+  }
+
+  @Test
+  public void testRestoreInstants() throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    assertTrue(activeCommitTimeline.empty());
+
+    for (int i = 1; i <= 5; i++) {
+      String ts = i + "";
+      HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts);
+      activeTimeline.createNewInstant(instant);
+      activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2)));
+    }
+
+    metaClient.reloadActiveTimeline();
+
+    // verify modified partitions included cleaned data
+    List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+
+    partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+  }
+
+  private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException {
+    HoodieRestoreMetadata metadata = new HoodieRestoreMetadata();
+    List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
+    rollbackM.add(getRollbackMetadataInstance(basePath, partition, commitTs, count));
+    metadata.setHoodieRestoreMetadata(CollectionUtils.createImmutableMap(commitTs, rollbackM));
+    List<String> rollbackInstants = new ArrayList<>();
+    rollbackInstants.add(commitTs);
+    metadata.setInstantsToRollback(rollbackInstants);
+    metadata.setStartRestoreTime(commitTs);
+    return TimelineMetadataUtils.serializeRestoreMetadata(metadata).get();
+  }
+
+  private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, String partition, String commitTs, int count) {
+    List<String> deletedFiles = new ArrayList<>();
+    for (int i = 1; i <= count; i++) {
+      deletedFiles.add("file-" + i);
+    }
+    List<String> rollbacks = new ArrayList<>();
+    rollbacks.add(commitTs);
+
+    HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap());
+    List<HoodieRollbackStat> rollbackStats = new ArrayList<>();
+    rollbackStats.add(rollbackStat);
+    return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
+  }
+
+  private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count)
+      throws IOException {
+    HoodieCommitMetadata commit = new HoodieCommitMetadata();
+    for (int i = 1; i <= count; i++) {
+      HoodieWriteStat stat = new HoodieWriteStat();
+      stat.setFileId(i + "");
+      stat.setPartitionPath(Paths.get(basePath, partition).toString());
+      stat.setPath(commitTs + "." + i + ".parquet");
+      commit.addWriteStat(partition, stat);
+    }
+    return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  private Option<byte[]> getCleanMetadata(String partition, String time) throws IOException {
+    Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new HashMap<>();
+    List<String> filesDeleted = new ArrayList<>();
+    filesDeleted.add("file-" + partition + "-" + time + "1");
+    filesDeleted.add("file-" + partition + "-" + time + "2");
+    HoodieCleanPartitionMetadata partitionMetadata = HoodieCleanPartitionMetadata.newBuilder()
+        .setPartitionPath(partition)
+        .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+        .setFailedDeleteFiles(Collections.emptyList())
+        .setDeletePathPatterns(Collections.emptyList())
+        .setSuccessDeleteFiles(filesDeleted)
+        .build();
+    partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata);
+    HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder()
+        .setVersion(1)
+        .setTimeTakenInMillis(100)
+        .setTotalFilesDeleted(1)
+        .setStartCleanTime(time)
+        .setEarliestCommitToRetain(time)
+        .setPartitionMetadata(partitionToFilesCleaned).build();
+
+    return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata);
+  }
+
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index fe7f1e3..419ea16 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -20,16 +20,14 @@ package org.apache.hudi.sync.common;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -40,12 +38,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public abstract class AbstractSyncHoodieClient {
   private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
   protected final HoodieTableMetaClient metaClient;
-  protected HoodieTimeline activeTimeline;
   protected final HoodieTableType tableType;
   protected final FileSystem fs;
   private String basePath;
@@ -57,7 +53,6 @@ public abstract class AbstractSyncHoodieClient {
     this.basePath = basePath;
     this.assumeDatePartitioning = assumeDatePartitioning;
     this.fs = fs;
-    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   }
 
   public abstract void createTable(String tableName, MessageType storageSchema,
@@ -75,10 +70,6 @@ public abstract class AbstractSyncHoodieClient {
 
   public abstract Map<String, String> getTableSchema(String tableName);
 
-  public HoodieTimeline getActiveTimeline() {
-    return activeTimeline;
-  }
-
   public HoodieTableType getTableType() {
     return tableType;
   }
@@ -135,21 +126,14 @@ public abstract class AbstractSyncHoodieClient {
       }
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
-
-      HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
-      return timelineToSync.getInstants().map(s -> {
-        try {
-          return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
-        } catch (IOException e) {
-          throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
-        }
-      }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
+      return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
+          .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
     }
   }
 
   /**
    * Read the schema from the log file on path.
-  */
+   */
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
     MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);