You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/12 17:03:09 UTC

[doris] 25/33: [Improvement](multi catalog)Cache File for Hive Table, instead of cache file splits. (#18419)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 097bd53f3993cc501b1b20d975d8ee8c2199f2cb
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Fri Apr 7 00:07:23 2023 +0800

    [Improvement](multi catalog)Cache File for Hive Table, instead of cache file splits. (#18419)
    
    Currently, the session variable for Split size will not take effect after the file splits are cached.
    1. This PR is to cache file for Hive Table, instead of cache file splits. And split the file every time using the current split size.
    2. Use self splitter by default.
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  85 +++++++++++-----
 .../doris/planner/external/HiveSplitter.java       | 110 +++++++++++----------
 .../apache/doris/datasource/CatalogMgrTest.java    |   4 +-
 3 files changed, 118 insertions(+), 81 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 548e06d65b..39532a6785 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.Split;
 import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.planner.external.HiveSplitter;
 
@@ -44,7 +45,6 @@ import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -53,6 +53,8 @@ import com.google.common.collect.TreeRangeMap;
 import lombok.Data;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -97,7 +99,7 @@ public class HiveMetaStoreCache {
     // cache from <dbname-tblname-partition_values> -> <partition info>
     private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
     // the ref of cache from <location> -> <file list>
-    private volatile AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> fileCacheRef
+    private volatile AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> fileCacheRef
             = new AtomicReference<>();
 
     public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
@@ -148,10 +150,10 @@ public class HiveMetaStoreCache {
         }
         // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader
         // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader
-        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader = getGuavaCacheLoader(executor,
+        CacheLoader<FileCacheKey, FileCacheValue> loader = getGuavaCacheLoader(executor,
                 fileMetaCacheTtlSecond);
 
-        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = fileCacheRef.get();
+        LoadingCache<FileCacheKey, FileCacheValue> preFileCache = fileCacheRef.get();
 
         fileCacheRef.set(fileCacheBuilder.build(loader));
         if (Objects.nonNull(preFileCache)) {
@@ -262,7 +264,7 @@ public class HiveMetaStoreCache {
         return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values);
     }
 
-    private ImmutableList<FileSplit> loadFiles(FileCacheKey key) {
+    private FileCacheValue loadFiles(FileCacheKey key) {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -279,11 +281,11 @@ public class HiveMetaStoreCache {
             jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
             FileInputFormat.setInputPaths(jobConf, finalLocation);
             try {
-                FileSplit[] result;
+                FileCacheValue result;
                 InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 // TODO: This is a temp config, will remove it after the HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
+                    result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf);
                 } else {
                     InputSplit[] splits;
                     String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -294,18 +296,18 @@ public class HiveMetaStoreCache {
                     } else {
                         splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
                     }
-                    result = new FileSplit[splits.length];
+                    result = new FileCacheValue();
                     // Convert the hadoop split to Doris Split.
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
-                        result[i] =  new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null);
+                        result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null));
                     }
                 }
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName());
+                    LOG.debug("load #{} splits for {} in catalog {}", result.getFiles().size(), key, catalog.getName());
                 }
-                return ImmutableList.copyOf(result);
+                return result;
             } catch (Exception e) {
                 throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName());
             }
@@ -353,7 +355,7 @@ public class HiveMetaStoreCache {
         }
     }
 
-    public List<FileSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
+    public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
         partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
@@ -364,19 +366,18 @@ public class HiveMetaStoreCache {
         } else {
             stream = keys.parallelStream();
         }
-        List<ImmutableList<FileSplit>> fileLists = stream.map(k -> {
+        List<FileCacheValue> fileLists = stream.map(k -> {
             try {
                 return fileCacheRef.get().get(k);
             } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         }).collect(Collectors.toList());
-        List<FileSplit> retFiles = Lists.newArrayListWithExpectedSize(
-                fileLists.stream().mapToInt(l -> l.size()).sum());
-        fileLists.stream().forEach(l -> retFiles.addAll(l));
         LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
-                retFiles.size(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start));
-        return retFiles;
+                fileLists.stream().mapToInt(l -> l.getFiles() == null
+                    ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
+                partitions.size(), catalog.getName(), (System.currentTimeMillis() - start));
+        return fileLists;
     }
 
     public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) {
@@ -582,12 +583,12 @@ public class HiveMetaStoreCache {
      * @param fileMetaCacheTtlSecond
      * @return
      */
-    private CacheLoader<FileCacheKey, ImmutableList<FileSplit>> getGuavaCacheLoader(Executor executor,
+    private CacheLoader<FileCacheKey, FileCacheValue> getGuavaCacheLoader(Executor executor,
             int fileMetaCacheTtlSecond) {
-        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader =
-                new CacheLoader<FileCacheKey, ImmutableList<FileSplit>>() {
+        CacheLoader<FileCacheKey, FileCacheValue> loader =
+                new CacheLoader<FileCacheKey, FileCacheValue>() {
                     @Override
-                    public ImmutableList<FileSplit> load(FileCacheKey key) throws Exception {
+                    public FileCacheValue load(FileCacheKey key) throws Exception {
                         return loadFiles(key);
                     }
                 };
@@ -602,7 +603,7 @@ public class HiveMetaStoreCache {
      * get fileCache ref
      * @return
      */
-    public AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> getFileCacheRef() {
+    public AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> getFileCacheRef() {
         return fileCacheRef;
     }
 
@@ -694,7 +695,7 @@ public class HiveMetaStoreCache {
         public FileCacheKey(String location, String inputFormat) {
             this.location = location;
             this.inputFormat = inputFormat;
-            this.useSelfSplitter = false;
+            this.useSelfSplitter = true;
         }
 
         public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) {
@@ -725,6 +726,42 @@ public class HiveMetaStoreCache {
         }
     }
 
+    @Data
+    public static class FileCacheValue {
+        // File Cache for self splitter.
+        private List<HiveFileStatus> files;
+        // File split cache for old splitter. This is a temp variable.
+        private List<Split> splits;
+        private boolean isSplittable;
+
+        public void addFile(LocatedFileStatus file) {
+            if (files == null) {
+                files = Lists.newArrayList();
+            }
+            HiveFileStatus status = new HiveFileStatus();
+            status.setBlockLocations(file.getBlockLocations());
+            status.setPath(file.getPath());
+            status.length = file.getLen();
+            status.blockSize = file.getBlockSize();
+            files.add(status);
+        }
+
+        public void addSplit(Split split) {
+            if (splits == null) {
+                splits = Lists.newArrayList();
+            }
+            splits.add(split);
+        }
+    }
+
+    @Data
+    public static class HiveFileStatus {
+        BlockLocation[] blockLocations;
+        Path path;
+        long length;
+        long blockSize;
+    }
+
     @Data
     public static class HivePartitionValues {
         private long nextPartitionId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index 5a3af95c6d..b17704251a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -35,7 +35,6 @@ import org.apache.doris.planner.Split;
 import org.apache.doris.planner.Splitter;
 import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +50,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class HiveSplitter implements Splitter {
 
@@ -81,11 +79,11 @@ public class HiveSplitter implements Splitter {
                     partitionColumnTypes);
             }
             Map<String, String> properties = hmsTable.getCatalog().getCatalogProperty().getProperties();
-            boolean useSelfSplitter = false;
+            boolean useSelfSplitter = true;
             if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
-                    && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true")) {
+                    && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) {
                 LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName());
-                useSelfSplitter = true;
+                useSelfSplitter = false;
             }
 
             List<Split> allFiles = Lists.newArrayList();
@@ -135,15 +133,53 @@ public class HiveSplitter implements Splitter {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
-                                          List<Split> allFiles, boolean useSelfSplitter) {
-        List<FileSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
-                    Joiner.on(",")
-                    .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath())
-                        .collect(Collectors.toList())));
+                                          List<Split> allFiles, boolean useSelfSplitter) throws IOException {
+        for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
+                cache.getFilesByPartitions(partitions, useSelfSplitter)) {
+            if (fileCacheValue.getSplits() != null) {
+                allFiles.addAll(fileCacheValue.getSplits());
+            }
+            if (fileCacheValue.getFiles() != null) {
+                boolean isSplittable = fileCacheValue.isSplittable();
+                for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
+                    allFiles.addAll(splitFile(status, isSplittable));
+                }
+            }
         }
-        allFiles.addAll(files);
+    }
+
+    private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException {
+        List<Split> result = Lists.newArrayList();
+        if (!splittable) {
+            LOG.debug("Path {} is not splittable.", status.getPath());
+            BlockLocation block = status.getBlockLocations()[0];
+            result.add(new FileSplit(status.getPath(), 0, status.getLength(),
+                    status.getLength(), block.getHosts()));
+            return result;
+        }
+        long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
+        if (splitSize <= 0) {
+            splitSize = status.getBlockSize();
+        }
+        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+        splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
+        BlockLocation[] blockLocations = status.getBlockLocations();
+        long length = status.getLength();
+        long bytesRemaining;
+        for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
+                bytesRemaining -= splitSize) {
+            int location = getBlockIndex(blockLocations, length - bytesRemaining);
+            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
+                    splitSize, length, blockLocations[location].getHosts()));
+        }
+        if (bytesRemaining != 0L) {
+            int location = getBlockIndex(blockLocations, length - bytesRemaining);
+            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
+                    bytesRemaining, length, blockLocations[location].getHosts()));
+        }
+
+        LOG.debug("Path {} includes {} splits.", status.getPath(), result.size());
+        return result;
     }
 
     public int getTotalPartitionNum() {
@@ -154,52 +190,18 @@ public class HiveSplitter implements Splitter {
         return readPartitionNum;
     }
 
-    // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system.
-    public static FileSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
-                                             JobConf jobConf) throws IOException {
+    // Get File Status by using FileSystem API.
+    public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat<?, ?> inputFormat,
+                                                                  JobConf jobConf) throws IOException {
         FileSystem fs = path.getFileSystem(jobConf);
         boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
-        List<FileSplit> splits = Lists.newArrayList();
         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, true);
-        if (!locatedFileStatusRemoteIterator.hasNext()) {
-            LOG.debug("File status for path {} is empty.", path);
-            return new FileSplit[0];
-        }
-        if (!splittable) {
-            LOG.debug("Path {} is not splittable.", path);
-            while (locatedFileStatusRemoteIterator.hasNext()) {
-                LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
-                BlockLocation block = status.getBlockLocations()[0];
-                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts()));
-            }
-            return splits.toArray(new FileSplit[splits.size()]);
-        }
-        long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
+        HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue();
+        result.setSplittable(splittable);
         while (locatedFileStatusRemoteIterator.hasNext()) {
-            LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
-            if (splitSize <= 0) {
-                splitSize = status.getBlockSize();
-            }
-            // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-            splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
-            BlockLocation[] blockLocations = status.getBlockLocations();
-            long length = status.getLen();
-            long bytesRemaining;
-            for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
-                    bytesRemaining -= splitSize) {
-                int location = getBlockIndex(blockLocations, length - bytesRemaining);
-                splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                        splitSize, length, blockLocations[location].getHosts()));
-            }
-            if (bytesRemaining != 0L) {
-                int location = getBlockIndex(blockLocations, length - bytesRemaining);
-                splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                        bytesRemaining, length, blockLocations[location].getHosts()));
-            }
+            result.addFile(locatedFileStatusRemoteIterator.next());
         }
-
-        LOG.debug("Path {} includes {} splits.", path, splits.size());
-        return splits.toArray(new FileSplit[splits.size()]);
+        return result;
     }
 
     private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index 67416619b0..10692a5f68 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -54,7 +54,6 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
-import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
@@ -62,7 +61,6 @@ import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
@@ -678,7 +676,7 @@ public class CatalogMgrTest extends TestWithFeService {
 
         HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName);
         HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
-        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = metaStoreCache.getFileCacheRef().get();
+        LoadingCache<FileCacheKey, HiveMetaStoreCache.FileCacheValue> preFileCache = metaStoreCache.getFileCacheRef().get();
 
 
         // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org