You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/08/25 19:40:26 UTC
[hudi] branch master updated: [HUDI-1191] Add incremental meta
client API to query partitions modified in a time window
This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 492ddcb [HUDI-1191] Add incremental meta client API to query partitions modified in a time window
492ddcb is described below
commit 492ddcbb06e107618ae71fa369ba07af47a036fb
Author: Satish Kotha <sa...@uber.com>
AuthorDate: Thu Aug 13 17:14:25 2020 -0700
[HUDI-1191] Add incremental meta client API to query partitions modified in a time window
---
.../hudi/common/table/timeline/TimelineUtils.java | 101 ++++++++++
.../hudi/common/table/TestTimelineUtils.java | 213 +++++++++++++++++++++
.../hudi/sync/common/AbstractSyncHoodieClient.java | 26 +--
3 files changed, 319 insertions(+), 21 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
new file mode 100644
index 0000000..fd30ee3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table.
+ *
+ * This is useful in multiple places including:
+ * 1) HiveSync - this can be used to query partitions that changed since previous sync.
+ * 2) Incremental reads - InputFormats can use this API to query
+ */
+public class TimelineUtils {
+
+ /**
+ * Returns partitions that have new data strictly after commitTime.
+ * Does not include internal operations such as clean in the timeline.
+ */
+ public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
+ HoodieTimeline timelineToSync = timeline.getCommitsAndCompactionTimeline();
+ return getAffectedPartitions(timelineToSync);
+ }
+
+ /**
+ * Returns partitions that have been modified including internal operations such as clean in the passed timeline.
+ */
+ public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
+ return timeline.filterCompletedInstants().getInstants().flatMap(s -> {
+ switch (s.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
+ return commitMetadata.getPartitionToWriteStats().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions written at " + s, e);
+ }
+ case HoodieTimeline.CLEAN_ACTION:
+ try {
+ HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(s).get());
+ return cleanMetadata.getPartitionMetadata().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions cleaned at " + s, e);
+ }
+ case HoodieTimeline.ROLLBACK_ACTION:
+ try {
+ return TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions rolledback at " + s, e);
+ }
+ case HoodieTimeline.RESTORE_ACTION:
+ try {
+ HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(s).get(), HoodieRestoreMetadata.class);
+ return restoreMetadata.getHoodieRestoreMetadata().values().stream()
+ .flatMap(Collection::stream)
+ .flatMap(rollbackMetadata -> rollbackMetadata.getPartitionMetadata().keySet().stream());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions restored at " + s, e);
+ }
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ try {
+ return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions savepoint at " + s, e);
+ }
+ case HoodieTimeline.COMPACTION_ACTION:
+ // compaction is not a completed instant. So no need to consider this action.
+ return Stream.empty();
+ default:
+ throw new HoodieIOException("unknown action in timeline " + s.getAction());
+ }
+
+ }).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList());
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
new file mode 100644
index 0000000..1a1ac54
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestTimelineUtils extends HoodieCommonTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initMetaClient();
+ }
+
+ @Test
+ public void testGetPartitions() throws IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+ assertTrue(activeCommitTimeline.empty());
+
+ String olderPartition = "0"; // older partitions that is modified by all cleans
+ for (int i = 1; i <= 5; i++) {
+ String ts = i + "";
+ HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+ activeTimeline.createNewInstant(instant);
+ activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2)));
+
+ HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
+ activeTimeline.createNewInstant(cleanInstant);
+ activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts));
+ }
+
+ metaClient.reloadActiveTimeline();
+
+ // verify modified partitions included cleaned data
+ List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+ assertEquals(5, partitions.size());
+ assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"}));
+
+ partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+ assertEquals(4, partitions.size());
+ assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"}));
+
+ // verify only commit actions
+ partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+ assertEquals(4, partitions.size());
+ assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+
+ partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+ assertEquals(3, partitions.size());
+ assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ }
+
+ @Test
+ public void testGetPartitionsUnpartitioned() throws IOException {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+ assertTrue(activeCommitTimeline.empty());
+
+ String partitionPath = "";
+ for (int i = 1; i <= 5; i++) {
+ String ts = i + "";
+ HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+ activeTimeline.createNewInstant(instant);
+ activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2)));
+
+ HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
+ activeTimeline.createNewInstant(cleanInstant);
+ activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts));
+ }
+
+ metaClient.reloadActiveTimeline();
+
+ // verify modified partitions included cleaned data
+ List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+ assertTrue(partitions.isEmpty());
+
+ partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+ assertTrue(partitions.isEmpty());
+ }
+
+ @Test
+ public void testRestoreInstants() throws Exception {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+ assertTrue(activeCommitTimeline.empty());
+
+ for (int i = 1; i <= 5; i++) {
+ String ts = i + "";
+ HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts);
+ activeTimeline.createNewInstant(instant);
+ activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2)));
+ }
+
+ metaClient.reloadActiveTimeline();
+
+ // verify modified partitions included cleaned data
+ List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
+ assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+
+ partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
+ assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+ }
+
+ private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException {
+ HoodieRestoreMetadata metadata = new HoodieRestoreMetadata();
+ List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
+ rollbackM.add(getRollbackMetadataInstance(basePath, partition, commitTs, count));
+ metadata.setHoodieRestoreMetadata(CollectionUtils.createImmutableMap(commitTs, rollbackM));
+ List<String> rollbackInstants = new ArrayList<>();
+ rollbackInstants.add(commitTs);
+ metadata.setInstantsToRollback(rollbackInstants);
+ metadata.setStartRestoreTime(commitTs);
+ return TimelineMetadataUtils.serializeRestoreMetadata(metadata).get();
+ }
+
+ private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, String partition, String commitTs, int count) {
+ List<String> deletedFiles = new ArrayList<>();
+ for (int i = 1; i <= count; i++) {
+ deletedFiles.add("file-" + i);
+ }
+ List<String> rollbacks = new ArrayList<>();
+ rollbacks.add(commitTs);
+
+ HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap());
+ List<HoodieRollbackStat> rollbackStats = new ArrayList<>();
+ rollbackStats.add(rollbackStat);
+ return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
+ }
+
+ private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count)
+ throws IOException {
+ HoodieCommitMetadata commit = new HoodieCommitMetadata();
+ for (int i = 1; i <= count; i++) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setFileId(i + "");
+ stat.setPartitionPath(Paths.get(basePath, partition).toString());
+ stat.setPath(commitTs + "." + i + ".parquet");
+ commit.addWriteStat(partition, stat);
+ }
+ return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ private Option<byte[]> getCleanMetadata(String partition, String time) throws IOException {
+ Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new HashMap<>();
+ List<String> filesDeleted = new ArrayList<>();
+ filesDeleted.add("file-" + partition + "-" + time + "1");
+ filesDeleted.add("file-" + partition + "-" + time + "2");
+ HoodieCleanPartitionMetadata partitionMetadata = HoodieCleanPartitionMetadata.newBuilder()
+ .setPartitionPath(partition)
+ .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .setFailedDeleteFiles(Collections.emptyList())
+ .setDeletePathPatterns(Collections.emptyList())
+ .setSuccessDeleteFiles(filesDeleted)
+ .build();
+ partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata);
+ HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder()
+ .setVersion(1)
+ .setTimeTakenInMillis(100)
+ .setTotalFilesDeleted(1)
+ .setStartCleanTime(time)
+ .setEarliestCommitToRetain(time)
+ .setPartitionMetadata(partitionToFilesCleaned).build();
+
+ return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata);
+ }
+
+}
\ No newline at end of file
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index fe7f1e3..419ea16 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -20,16 +20,14 @@ package org.apache.hudi.sync.common;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -40,12 +38,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public abstract class AbstractSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
protected final HoodieTableMetaClient metaClient;
- protected HoodieTimeline activeTimeline;
protected final HoodieTableType tableType;
protected final FileSystem fs;
private String basePath;
@@ -57,7 +53,6 @@ public abstract class AbstractSyncHoodieClient {
this.basePath = basePath;
this.assumeDatePartitioning = assumeDatePartitioning;
this.fs = fs;
- this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public abstract void createTable(String tableName, MessageType storageSchema,
@@ -75,10 +70,6 @@ public abstract class AbstractSyncHoodieClient {
public abstract Map<String, String> getTableSchema(String tableName);
- public HoodieTimeline getActiveTimeline() {
- return activeTimeline;
- }
-
public HoodieTableType getTableType() {
return tableType;
}
@@ -135,21 +126,14 @@ public abstract class AbstractSyncHoodieClient {
}
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
-
- HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
- return timelineToSync.getInstants().map(s -> {
- try {
- return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
- } catch (IOException e) {
- throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
- }
- }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
+ return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
+ .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
}
}
/**
* Read the schema from the log file on path.
- */
+ */
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);