You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/04/25 19:12:25 UTC

[hudi] branch master updated: [HUDI-6090] Optimise payload size for list of FileGroupDTO (#8480)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e641b2530e [HUDI-6090] Optimise payload size for list of FileGroupDTO (#8480)
3e641b2530e is described below

commit 3e641b2530e1e6ba2428a62229be18c9b44c8112
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Apr 26 00:42:18 2023 +0530

    [HUDI-6090] Optimise payload size for list of FileGroupDTO (#8480)
    
    - Optimise FileGroupDTO - avoid sending Timeline with every file group
    
    ---------
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../hudi/common/table/timeline/dto/DTOUtils.java   | 62 ++++++++++++++++++++++
 .../common/table/timeline/dto/FileGroupDTO.java    | 15 +++---
 .../view/RemoteHoodieTableFileSystemView.java      |  9 ++--
 .../hudi/timeline/service/RequestHandler.java      |  2 +-
 .../service/handlers/FileSliceHandler.java         | 14 +++--
 .../TestRemoteHoodieTableFileSystemView.java       | 54 +++++++++++++++++++
 6 files changed, 140 insertions(+), 16 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java
new file mode 100644
index 00000000000..ef5a8869487
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.dto;
+
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * DTO utils to hold batch apis.
+ */
+public class DTOUtils {
+
+  public static List<FileGroupDTO> fileGroupDTOsfromFileGroups(List<HoodieFileGroup> fileGroups) {
+    if (fileGroups.isEmpty()) {
+      return Collections.emptyList();
+    } else if (fileGroups.size() == 1) {
+      return Collections.singletonList(FileGroupDTO.fromFileGroup(fileGroups.get(0), true));
+    } else {
+      List<FileGroupDTO> fileGroupDTOS = new ArrayList<>();
+      fileGroupDTOS.add(FileGroupDTO.fromFileGroup(fileGroups.get(0), true));
+      fileGroupDTOS.addAll(fileGroups.subList(1, fileGroups.size()).stream()
+          .map(fg -> FileGroupDTO.fromFileGroup(fg, false)).collect(Collectors.toList()));
+      return fileGroupDTOS;
+    }
+  }
+
+  public static Stream<HoodieFileGroup> fileGroupDTOsToFileGroups(List<FileGroupDTO> dtos, HoodieTableMetaClient metaClient) {
+    if (dtos.isEmpty()) {
+      return Stream.empty();
+    }
+
+    // Timeline exists only in the first file group DTO. Optimisation to reduce payload size.
+    checkState(dtos.get(0).timeline != null, "Timeline is expected to be set for the first FileGroupDTO");
+    HoodieTimeline timeline = TimelineDTO.toTimeline(dtos.get(0).timeline, metaClient);
+    return dtos.stream().map(dto -> FileGroupDTO.toFileGroup(dto, timeline));
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java
index dfbd40126c0..bc5cbdb8022 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileGroupDTO.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.common.table.timeline.dto;
 
 import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -45,19 +45,20 @@ public class FileGroupDTO {
   @JsonProperty("timeline")
   TimelineDTO timeline;
 
-  public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup) {
+  public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup, boolean includeTimeline) {
     FileGroupDTO dto = new FileGroupDTO();
     dto.partition = fileGroup.getPartitionPath();
     dto.id = fileGroup.getFileGroupId().getFileId();
     dto.slices = fileGroup.getAllRawFileSlices().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
-    dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline());
+    if (includeTimeline) {
+      dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline());
+    }
     return dto;
   }
 
-  public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTableMetaClient metaClient) {
-    HoodieFileGroup fileGroup =
-        new HoodieFileGroup(dto.partition, dto.id, TimelineDTO.toTimeline(dto.timeline, metaClient));
-    dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileSlice -> fileGroup.addFileSlice(fileSlice));
+  public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTimeline fgTimeline) {
+    HoodieFileGroup fileGroup = new HoodieFileGroup(dto.partition, dto.id, fgTimeline);
+    dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileGroup::addFileSlice);
     return fileGroup;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index e5895c84695..c1772db6bfc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
 import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
 import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
+import org.apache.hudi.common.table.timeline.dto.DTOUtils;
 import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
 import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
 import org.apache.hudi.common.table.timeline.dto.InstantDTO;
@@ -398,7 +399,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     try {
       List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap,
           new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
-      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
@@ -410,7 +411,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     try {
       List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap,
           new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
-      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
@@ -422,7 +423,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     try {
       List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
           new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
-      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
@@ -434,7 +435,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     try {
       List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
           new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
-      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient));
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index aabec3d57a4..9f18cc3453b 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -160,7 +160,7 @@ public class RequestHandler {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
           + "], localTimeline=" + localTimeline.getInstants());
-    }
+    } 
 
     if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
         && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index caf1e3c9399..44e12855ac7 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -21,8 +21,10 @@ package org.apache.hudi.timeline.service.handlers;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
+import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
 import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
+import org.apache.hudi.common.table.timeline.dto.DTOUtils;
 import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
 import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -95,23 +97,27 @@ public class FileSliceHandler extends Handler {
   }
 
   public List<FileGroupDTO> getAllFileGroups(String basePath, String partitionPath) {
-    return viewManager.getFileSystemView(basePath).getAllFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup)
+    List<HoodieFileGroup> fileGroups =  viewManager.getFileSystemView(basePath).getAllFileGroups(partitionPath)
         .collect(Collectors.toList());
+    return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
   }
 
   public List<FileGroupDTO> getReplacedFileGroupsBeforeOrOn(String basePath, String maxCommitTime, String partitionPath) {
-    return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup)
+    List<HoodieFileGroup> fileGroups =  viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime, partitionPath)
         .collect(Collectors.toList());
+    return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
   }
 
   public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath, String maxCommitTime, String partitionPath) {
-    return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup)
+    List<HoodieFileGroup> fileGroups = viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath)
         .collect(Collectors.toList());
+    return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
   }
   
   public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String partitionPath) {
-    return viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup)
+    List<HoodieFileGroup> fileGroups =  viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath)
         .collect(Collectors.toList());
+    return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
   }
 
   public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String basePath) {
diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
index 0836a7558a2..c9a103e5264 100644
--- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
+++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
@@ -21,23 +21,37 @@ package org.apache.hudi.timeline.service.functional;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.dto.DTOUtils;
+import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.MockHoodieTimeline;
 import org.apache.hudi.exception.HoodieRemoteException;
 import org.apache.hudi.timeline.service.TimelineService;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
@@ -124,4 +138,44 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
         .forEach(t -> assertTrue(t.isDaemon()));
     server.close();
   }
+
+  @Test
+  public void testListFileGroupDTOPayload() throws IOException, NoSuchFieldException, IllegalAccessException {
+    ObjectMapper mapper = new ObjectMapper();
+    List<HoodieFileGroup> fileGroups = new ArrayList<>();
+    fileGroups.add(createHoodieFileGroup());
+    fileGroups.add(createHoodieFileGroup());
+    fileGroups.add(createHoodieFileGroup());
+
+    // Timeline exists only in the first file group DTO. Optimisation to reduce payload size.
+    Field timelineDTOField = FileGroupDTO.class.getDeclaredField("timeline");
+    timelineDTOField.setAccessible(true);
+    List<FileGroupDTO> fileGroupDTOs = DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
+    assertNotNull(timelineDTOField.get(fileGroupDTOs.get(0)));
+    // Verify other DTO objects do not contain timeline
+    assertNull(timelineDTOField.get(fileGroupDTOs.get(1)));
+    assertNull(timelineDTOField.get(fileGroupDTOs.get(2)));
+
+    String prettyResult = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(fileGroupDTOs);
+    String normalResult = mapper.writeValueAsString(fileGroupDTOs);
+
+    Stream<HoodieFileGroup> prettyFileGroups = readFileGroupStream(prettyResult, mapper);
+    Stream<HoodieFileGroup> normalFileGroups = readFileGroupStream(normalResult, mapper);
+    // FileGroupDTO.toFileGroup should make sure Timeline is repopulated to all the FileGroups
+    prettyFileGroups.forEach(g -> assertNotNull(g.getTimeline()));
+    normalFileGroups.forEach(g -> assertNotNull(g.getTimeline()));
+  }
+
+  private Stream<HoodieFileGroup> readFileGroupStream(String result, ObjectMapper mapper) throws IOException {
+    return DTOUtils.fileGroupDTOsToFileGroups((List<FileGroupDTO>) mapper.readValue(result, new TypeReference<List<FileGroupDTO>>() {}),
+        metaClient);
+  }
+
+  private HoodieFileGroup createHoodieFileGroup() {
+    Stream<String> completed = Stream.of("001");
+    Stream<String> inflight = Stream.of("002");
+    MockHoodieTimeline activeTimeline = new MockHoodieTimeline(completed, inflight);
+    return new HoodieFileGroup("", "data",
+        activeTimeline.getCommitsTimeline().filterCompletedInstants());
+  }
 }