You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/12 17:02:51 UTC

[doris] 07/33: [Improvement](multi catalog)Change hive metastore cache split value type to Doris defined Split. Fix split file length -1 bug (#18319)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 923fca6e1a0fd1a340841d9e4986a20d77468d13
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Mon Apr 3 13:54:28 2023 +0800

    [Improvement](multi catalog)Change hive metastore cache split value type to Doris defined Split. Fix split file length -1 bug (#18319)
    
    HiveMetastoreCache type for file split was Hadoop InputSplit. In this pr, change it to Doris defined Split
    This change could avoid convert it every time.
    Also fix the explain verbose result return -1 for split file length.
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 40 +++++++++++++---------
 .../planner/external/ExternalFileScanNode.java     |  6 ++--
 .../doris/planner/external/HiveSplitter.java       | 28 ++++++---------
 .../apache/doris/datasource/CatalogMgrTest.java    |  4 +--
 4 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d963b52c7f..548e06d65b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.planner.external.HiveSplitter;
 
 import com.google.common.base.Preconditions;
@@ -96,7 +97,7 @@ public class HiveMetaStoreCache {
     // cache from <dbname-tblname-partition_values> -> <partition info>
     private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
     // the ref of cache from <location> -> <file list>
-    private volatile AtomicReference<LoadingCache<FileCacheKey, ImmutableList<InputSplit>>> fileCacheRef
+    private volatile AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> fileCacheRef
             = new AtomicReference<>();
 
     public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
@@ -147,10 +148,10 @@ public class HiveMetaStoreCache {
         }
         // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader
         // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader
-        CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader = getGuavaCacheLoader(executor,
+        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader = getGuavaCacheLoader(executor,
                 fileMetaCacheTtlSecond);
 
-        LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = fileCacheRef.get();
+        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = fileCacheRef.get();
 
         fileCacheRef.set(fileCacheBuilder.build(loader));
         if (Objects.nonNull(preFileCache)) {
@@ -261,7 +262,7 @@ public class HiveMetaStoreCache {
         return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values);
     }
 
-    private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
+    private ImmutableList<FileSplit> loadFiles(FileCacheKey key) {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -278,12 +279,13 @@ public class HiveMetaStoreCache {
             jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
             FileInputFormat.setInputPaths(jobConf, finalLocation);
             try {
+                FileSplit[] result;
                 InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
-                InputSplit[] splits;
                 // TODO: This is a temp config, will remove it after the HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    splits = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
+                    result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
                 } else {
+                    InputSplit[] splits;
                     String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
                     if (!Strings.isNullOrEmpty(remoteUser)) {
                         UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
@@ -292,12 +294,18 @@ public class HiveMetaStoreCache {
                     } else {
                         splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
                     }
+                    result = new FileSplit[splits.length];
+                    // Convert the hadoop split to Doris Split.
+                    for (int i = 0; i < splits.length; i++) {
+                        org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
+                        result[i] =  new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null);
+                    }
                 }
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName());
+                    LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName());
                 }
-                return ImmutableList.copyOf(splits);
+                return ImmutableList.copyOf(result);
             } catch (Exception e) {
                 throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName());
             }
@@ -345,7 +353,7 @@ public class HiveMetaStoreCache {
         }
     }
 
-    public List<InputSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
+    public List<FileSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
         partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
@@ -356,14 +364,14 @@ public class HiveMetaStoreCache {
         } else {
             stream = keys.parallelStream();
         }
-        List<ImmutableList<InputSplit>> fileLists = stream.map(k -> {
+        List<ImmutableList<FileSplit>> fileLists = stream.map(k -> {
             try {
                 return fileCacheRef.get().get(k);
             } catch (ExecutionException e) {
                 throw new RuntimeException(e);
             }
         }).collect(Collectors.toList());
-        List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize(
+        List<FileSplit> retFiles = Lists.newArrayListWithExpectedSize(
                 fileLists.stream().mapToInt(l -> l.size()).sum());
         fileLists.stream().forEach(l -> retFiles.addAll(l));
         LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
@@ -574,12 +582,12 @@ public class HiveMetaStoreCache {
      * @param fileMetaCacheTtlSecond
      * @return
      */
-    private CacheLoader<FileCacheKey, ImmutableList<InputSplit>> getGuavaCacheLoader(Executor executor,
+    private CacheLoader<FileCacheKey, ImmutableList<FileSplit>> getGuavaCacheLoader(Executor executor,
             int fileMetaCacheTtlSecond) {
-        CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader =
-                new CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() {
+        CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader =
+                new CacheLoader<FileCacheKey, ImmutableList<FileSplit>>() {
                     @Override
-                    public ImmutableList<InputSplit> load(FileCacheKey key) throws Exception {
+                    public ImmutableList<FileSplit> load(FileCacheKey key) throws Exception {
                         return loadFiles(key);
                     }
                 };
@@ -594,7 +602,7 @@ public class HiveMetaStoreCache {
      * get fileCache ref
      * @return
      */
-    public AtomicReference<LoadingCache<FileCacheKey, ImmutableList<InputSplit>>> getFileCacheRef() {
+    public AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> getFileCacheRef() {
         return fileCacheRef;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index e22025800d..6c6089b16c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -758,7 +758,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     for (TFileRangeDesc file : fileRangeDescs) {
                         output.append(prefix).append("    ").append(file.getPath())
                                 .append(" start: ").append(file.getStartOffset())
-                                .append(" length: ").append(file.getFileSize())
+                                .append(" length: ").append(file.getSize())
                                 .append("\n");
                     }
                 } else {
@@ -766,7 +766,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                         TFileRangeDesc file = fileRangeDescs.get(i);
                         output.append(prefix).append("    ").append(file.getPath())
                                 .append(" start: ").append(file.getStartOffset())
-                                .append(" length: ").append(file.getFileSize())
+                                .append(" length: ").append(file.getSize())
                                 .append("\n");
                     }
                     int other = size - 4;
@@ -774,7 +774,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     TFileRangeDesc file = fileRangeDescs.get(size - 1);
                     output.append(prefix).append("    ").append(file.getPath())
                             .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getFileSize())
+                            .append(" length: ").append(file.getSize())
                             .append("\n");
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index f7f09b6da6..9c8dec303b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -42,9 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -138,20 +136,14 @@ public class HiveSplitter implements Splitter {
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
                                           List<Split> allFiles, boolean useSelfSplitter) {
-        List<InputSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
+        List<FileSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
         if (LOG.isDebugEnabled()) {
             LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
                     Joiner.on(",")
                     .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath())
                         .collect(Collectors.toList())));
         }
-        allFiles.addAll(files.stream().map(file -> {
-            FileSplit fs = (FileSplit) file;
-            org.apache.doris.planner.external.FileSplit split = new org.apache.doris.planner.external.FileSplit(
-                    fs.getPath(), fs.getStart(), fs.getLength(), -1, null
-            );
-            return split;
-        }).collect(Collectors.toList()));
+        allFiles.addAll(files);
     }
 
     public int getTotalPartitionNum() {
@@ -163,24 +155,24 @@ public class HiveSplitter implements Splitter {
     }
 
     // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system.
-    public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
+    public static FileSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
                                              JobConf jobConf) throws IOException {
         FileSystem fs = path.getFileSystem(jobConf);
         boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
-        List<InputSplit> splits = Lists.newArrayList();
+        List<FileSplit> splits = Lists.newArrayList();
         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, true);
         if (!locatedFileStatusRemoteIterator.hasNext()) {
             LOG.debug("File status for path {} is empty.", path);
-            return new InputSplit[0];
+            return new FileSplit[0];
         }
         if (!splittable) {
             LOG.debug("Path {} is not splittable.", path);
             while (locatedFileStatusRemoteIterator.hasNext()) {
                 LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
                 BlockLocation block = status.getBlockLocations()[0];
-                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), block.getHosts()));
+                splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts()));
             }
-            return splits.toArray(new InputSplit[splits.size()]);
+            return splits.toArray(new FileSplit[splits.size()]);
         }
         long splitSize = Config.file_split_size;
         boolean useDefaultBlockSize = (splitSize <= 0);
@@ -196,17 +188,17 @@ public class HiveSplitter implements Splitter {
                     bytesRemaining -= splitSize) {
                 int location = getBlockIndex(blockLocations, length - bytesRemaining);
                 splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                        splitSize, blockLocations[location].getHosts()));
+                        splitSize, length, blockLocations[location].getHosts()));
             }
             if (bytesRemaining != 0L) {
                 int location = getBlockIndex(blockLocations, length - bytesRemaining);
                 splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                        bytesRemaining, blockLocations[location].getHosts()));
+                        bytesRemaining, length, blockLocations[location].getHosts()));
             }
         }
 
         LOG.debug("Path {} includes {} splits.", path, splits.size());
-        return splits.toArray(new InputSplit[splits.size()]);
+        return splits.toArray(new FileSplit[splits.size()]);
     }
 
     private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index c93be4a3ac..67416619b0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -54,6 +54,7 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
@@ -66,7 +67,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
-import org.apache.hadoop.mapred.InputSplit;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -678,7 +678,7 @@ public class CatalogMgrTest extends TestWithFeService {
 
         HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName);
         HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
-        LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = metaStoreCache.getFileCacheRef().get();
+        LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = metaStoreCache.getFileCacheRef().get();
 
 
         // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org