You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/09 18:31:48 UTC

[GitHub] [hudi] prashantwason commented on a change in pull request #3417: [HUDI-2281] Add metadata client APIs to fetch list of data files and …

prashantwason commented on a change in pull request #3417:
URL: https://github.com/apache/hudi/pull/3417#discussion_r685421308



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieIncrementalMetadataClient.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * <code>HoodieIncrementalMetadataClient</code> allows to fetch details about incremental updates
+ * to the table using just the metadata.
+ *
+ */
+public class HoodieIncrementalMetadataClient implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private HoodieTableMetaClient metaClient;
+
+  /**
+   * Creates an HoodieIncrementalMetadataClient object.
+   *
+   * @param conf Hoodie Configuration
+   * @param basePath Base path of the Table
+   */
+  public HoodieIncrementalMetadataClient(Configuration conf, String basePath) throws IOException {
+    this(HoodieTableMetaClient.builder().setBasePath(basePath).setConf(conf).build());
+  }
+
+  /**
+   * Create HoodieIncrementalMetadataClient from HoodieTableMetaClient.
+   */
+  public HoodieIncrementalMetadataClient(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Get the underlying meta client object.
+   *
+   * @return Meta client
+   */
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
+  /**
+   * Relods the underlying meta client.
+   */
+  public void reload() {
+    this.metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Gets the partitions modified from a hoodie instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param hoodieInstant Hoodie instant
+   * @return Pairs of Partition and the Filenames
+   * @throws HoodieIOException
+   */
+  private Stream<String> getPartitionNameFromInstant(

Review comment:
       Should this be "Names" as it returns more than one partition.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieSnapshotMetadataClient.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieSnapshotMetadataClient {
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+  
+  @TempDir
+  public java.nio.file.Path folder;
+  
+  private String basePath;
+  private String partitionPath;
+  private String fullPartitionPath;
+  private HoodieSnapshotMetadataClient snapshotMetadataClient;
+  private HoodieTableMetaClient metaClient;
+
+  private String fileId1 = UUID.randomUUID().toString();
+  private String fileId2 = UUID.randomUUID().toString();
+  private String fileId3 = UUID.randomUUID().toString();
+  private String fileId4 = UUID.randomUUID().toString();
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    basePath = folder.resolve("dataset").toString();
+    partitionPath = "2016/05/01/";
+    fullPartitionPath = basePath + "/" + partitionPath;
+    HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    setupDataFiles();
+    snapshotMetadataClient = new HoodieSnapshotMetadataClient(HoodieTestUtils.getDefaultHadoopConf(), basePath);
+    metaClient = snapshotMetadataClient.getMetaClient();
+    
+  }
+  
+  private void setupDataFiles() throws IOException {
+    // Put some files in the partition
+    new File(fullPartitionPath).mkdirs();
+    String cleanTime1 = "0";
+    String commitTime1 = "1";
+    String commitTime2 = "2";
+    String commitTime3 = "3";
+    String commitTime4 = "4";
+
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath
+        + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0, TEST_WRITE_TOKEN))
+        .createNewFile();
+
+    // Create commit/clean files
+    new File(basePath + "/.hoodie/" + cleanTime1 + ".clean").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile();
+  }
+
+  @Test
+  public void testSnapshotMetadata() throws IOException {
+    assertEquals("4", snapshotMetadataClient.getLatestInstant().get());
+    
+    Set<String> fileIds = snapshotMetadataClient.getLatestSnapshotFiles(partitionPath).map(FSUtils::getFileIdFromFilePath)
+        .collect(Collectors.toSet());
+    
+    //fileId4 has only log file. so ensure it doesnt show up in results.
+    assertEquals(Stream.of(fileId1, fileId2, fileId3).collect(Collectors.toSet()), fileIds);
+    
+    Set<String> fileIdsAt2 = snapshotMetadataClient.getSnapshotFilesAt("2", partitionPath).map(FSUtils::getFileIdFromFilePath)

Review comment:
       commitTime2 instead of hardcoding "2"

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieIncrementalMetadataClient.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * <code>HoodieIncrementalMetadataClient</code> allows to fetch details about incremental updates
+ * to the table using just the metadata.
+ *
+ */
+public class HoodieIncrementalMetadataClient implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private HoodieTableMetaClient metaClient;
+
+  /**
+   * Creates an HoodieIncrementalMetadataClient object.
+   *
+   * @param conf Hoodie Configuration
+   * @param basePath Base path of the Table
+   */
+  public HoodieIncrementalMetadataClient(Configuration conf, String basePath) throws IOException {
+    this(HoodieTableMetaClient.builder().setBasePath(basePath).setConf(conf).build());
+  }
+
+  /**
+   * Create HoodieIncrementalMetadataClient from HoodieTableMetaClient.
+   */
+  public HoodieIncrementalMetadataClient(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Get the underlying meta client object.
+   *
+   * @return Meta client
+   */
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
+  /**
+   * Relods the underlying meta client.
+   */
+  public void reload() {
+    this.metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Gets the partitions modified from a hoodie instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param hoodieInstant Hoodie instant
+   * @return Pairs of Partition and the Filenames
+   * @throws HoodieIOException
+   */
+  private Stream<String> getPartitionNameFromInstant(
+      HoodieTimeline timeline,
+      HoodieInstant hoodieInstant) throws HoodieIOException {
+    switch (hoodieInstant.getAction()) {
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.DELTA_COMMIT_ACTION:
+        HoodieCommitMetadata commitMetadata;
+        try {
+          commitMetadata = HoodieCommitMetadata
+              .fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
+                  HoodieCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Unable to deserialize instant from avro", e);
+        }
+        return commitMetadata.getPartitionToWriteStats().keySet().stream();
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceCommitMetadata;
+        try {
+          replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
+              HoodieReplaceCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Unable to deserialize instant from avro", e);
+        }
+        Set<String> allPartitions = new HashSet<>(replaceCommitMetadata.getPartitionToWriteStats().keySet());
+        allPartitions.addAll(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet());
+        return allPartitions.stream();
+      case HoodieTimeline.CLEAN_ACTION:
+        try {
+          HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(hoodieInstant).get());
+          return cleanMetadata.getPartitionMetadata().keySet().stream();
+        } catch (IOException e) {
+          throw new HoodieIOException("unable to deserialize clean plan from avro", e);
+        }
+      default:
+        return Stream.empty();
+    }
+  }
+
+  /**
+   * Filters the instances from the timeline and returns the resulting partitions.
+   *
+   * @param timeline Hoodie timeline
+   * @param beginTs  Start commit timestamp
+   * @param endTs    End commit timestamp
+   * @return
+   */
+  private Stream<String> getPartitionNames(HoodieTimeline timeline, String beginTs, String endTs) {
+    return timeline.getInstants()
+        .filter(instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, beginTs) 

Review comment:
       Can use isInRange function of timeline for this range check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org