You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/10 04:24:03 UTC

[doris] branch master updated: [feature](file cache)Import `file cache` for remote file reader (#15622)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f17d69e450 [feature](file cache)Import `file cache` for remote file reader (#15622)
f17d69e450 is described below

commit f17d69e450bf1281356edd374e8e45023bca01fa
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Tue Jan 10 12:23:56 2023 +0800

    [feature](file cache)Import `file cache` for remote file reader (#15622)
    
    The main purpose of this pr is to import `fileCache` for lakehouse reading remote files.
    Use the local disk as the cache for reading remote file, so the next time this file is read,
    the data can be obtained directly from the local disk.
    In addition, this pr includes a few other minor changes
    
    Import File Cache:
    1. The imported `fileCache` is called `block_file_cache`, which uses lru replacement policy.
    2. Implement a new FileRereader `CachedRemoteFilereader`, so that the logic of `file cache` is hidden under `CachedRemoteFilereader`.
    
    Other changes:
    1. Add a new interface `fs()` for `FileReader`.
    2. `IOContext` adds some statistical information to count the situation of `FileCache`
    
    Co-authored-by: Lightman <31...@users.noreply.github.com>
---
 be/src/common/config.h                             |   9 +
 be/src/io/CMakeLists.txt                           |   6 +
 be/src/io/cache/block/block_file_cache.cpp         | 125 +++
 be/src/io/cache/block/block_file_cache.h           | 252 ++++++
 be/src/io/cache/block/block_file_cache_factory.cpp |  94 +++
 be/src/io/cache/block/block_file_cache_factory.h   |  59 ++
 .../path.h => cache/block/block_file_cache_fwd.h}  |  17 +-
 be/src/io/cache/block/block_file_cache_profile.cpp | 202 +++++
 be/src/io/cache/block/block_file_cache_profile.h   | 115 +++
 .../block/block_file_cache_settings.h}             |  18 +-
 be/src/io/cache/block/block_file_segment.cpp       | 317 ++++++++
 be/src/io/cache/block/block_file_segment.h         | 205 +++++
 be/src/io/cache/block/block_lru_file_cache.cpp     | 844 +++++++++++++++++++++
 be/src/io/cache/block/block_lru_file_cache.h       | 159 ++++
 .../io/cache/block/cached_remote_file_reader.cpp   | 215 ++++++
 be/src/io/cache/block/cached_remote_file_reader.h  |  76 ++
 be/src/io/cache/dummy_file_cache.h                 |   3 +
 be/src/io/cache/file_cache_manager.cpp             |   6 +-
 be/src/io/cache/file_cache_manager.h               |   2 +-
 be/src/io/cache/sub_file_cache.cpp                 |   2 +-
 be/src/io/cache/sub_file_cache.h                   |   3 +
 be/src/io/cache/whole_file_cache.cpp               |   3 +-
 be/src/io/cache/whole_file_cache.h                 |   3 +
 be/src/io/file_factory.cpp                         |  64 +-
 be/src/io/file_factory.h                           |  24 +-
 be/src/io/fs/broker_file_reader.cpp                |  10 +-
 be/src/io/fs/broker_file_reader.h                  |   6 +-
 be/src/io/fs/broker_file_system.cpp                |   7 +-
 be/src/io/fs/broker_file_system.h                  |   2 +-
 be/src/io/fs/file_reader.h                         |   6 +-
 be/src/io/fs/file_reader_options.cpp               |  10 +-
 be/src/io/fs/file_reader_options.h                 |  20 +-
 be/src/io/fs/file_system.h                         |   6 +-
 be/src/io/fs/hdfs_file_reader.cpp                  |   4 +-
 be/src/io/fs/hdfs_file_reader.h                    |   6 +-
 be/src/io/fs/hdfs_file_system.cpp                  |   7 +-
 be/src/io/fs/hdfs_file_system.h                    |   2 +-
 be/src/io/fs/local_file_reader.cpp                 |   8 +-
 be/src/io/fs/local_file_reader.h                   |   6 +-
 be/src/io/fs/local_file_system.cpp                 |   6 +-
 be/src/io/fs/local_file_system.h                   |   6 +-
 be/src/io/fs/path.h                                |   6 +
 be/src/io/fs/remote_file_system.cpp                |  21 +-
 be/src/io/fs/remote_file_system.h                  |   4 +-
 be/src/io/fs/s3_file_reader.cpp                    |   4 +-
 be/src/io/fs/s3_file_reader.h                      |   6 +-
 be/src/io/fs/s3_file_system.cpp                    |   8 +-
 be/src/io/fs/s3_file_system.h                      |   2 +-
 be/src/io/fs/stream_load_pipe.h                    |   3 +
 be/src/olap/iterators.h                            |  14 +
 be/src/olap/olap_common.h                          |  14 +
 be/src/olap/options.cpp                            |  65 ++
 be/src/olap/options.h                              |  17 +
 be/src/olap/rowset/segment_v2/segment.cpp          |   8 +-
 be/src/{io/fs/path.h => service/brpc_conflict.h}   |  34 +-
 be/src/service/doris_main.cpp                      |  39 +
 be/src/service/internal_service.cpp                |  12 +-
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/common/hex.cpp                          | 115 +++
 be/src/vec/common/hex.h                            | 136 ++++
 be/src/vec/common/uint128.h                        |  32 +
 be/src/vec/core/block_spill_reader.cpp             |   5 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          |  19 +-
 be/src/vec/exec/format/csv/csv_reader.h            |  10 +-
 .../file_reader/new_plain_binary_line_reader.cpp   |   3 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |  20 +-
 be/src/vec/exec/format/json/new_json_reader.h      |  11 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  21 +-
 be/src/vec/exec/format/orc/vorc_reader.h           |   9 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  16 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  10 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  10 +-
 be/src/vec/exec/format/table/iceberg_reader.h      |  15 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  24 +-
 be/src/vec/exec/scan/vfile_scanner.h               |   3 +
 be/test/CMakeLists.txt                             |   1 +
 be/test/io/cache/file_block_cache_test.cpp         | 544 +++++++++++++
 be/test/olap/primary_key_index_test.cpp            |   2 +-
 .../olap/rowset/segment_v2/bitmap_index_test.cpp   |   3 +-
 .../bloom_filter_index_reader_writer_test.cpp      |   2 +-
 .../segment_v2/column_reader_writer_test.cpp       |   4 +-
 .../rowset/segment_v2/ordinal_page_index_test.cpp  |   2 +-
 .../olap/rowset/segment_v2/zone_map_index_test.cpp |   6 +-
 be/test/runtime/array_test.cpp                     |   2 +-
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |   5 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  12 +-
 86 files changed, 4029 insertions(+), 206 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 685820ae63..91b9eecc44 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -876,6 +876,15 @@ CONF_Int32(pipeline_executor_size, "0");
 // Will remove after fully test.
 CONF_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "false");
 
+// block file cache
+CONF_Bool(enable_file_cache, "false");
+// format: [{"path":"/path/to/file_cache","normal":21474836480,"persistent":10737418240,"query_limit":10737418240}]
+CONF_String(file_cache_path, "");
+CONF_String(disposable_file_cache_path, "");
+CONF_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
+CONF_Bool(clear_file_cache, "false");
+CONF_Bool(enable_file_cache_query_limit, "false");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 900026002f..684600fa37 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -52,6 +52,12 @@ set(IO_FILES
     cache/file_cache_manager.cpp
     cache/sub_file_cache.cpp
     cache/whole_file_cache.cpp
+    cache/block/block_file_segment.cpp
+    cache/block/block_file_cache.cpp
+    cache/block/block_file_cache_profile.cpp
+    cache/block/block_file_cache_factory.cpp
+    cache/block/block_lru_file_cache.cpp
+    cache/block/cached_remote_file_reader.cpp
 )
 
 add_library(IO STATIC
diff --git a/be/src/io/cache/block/block_file_cache.cpp b/be/src/io/cache/block/block_file_cache.cpp
new file mode 100644
index 0000000000..b8171653c4
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
+// and modified by Doris
+
+#include "io/cache/block/block_file_cache.h"
+
+#include <filesystem>
+
+#include "io/cache/block/block_file_cache_fwd.h"
+#include "io/cache/block/block_file_cache_settings.h"
+#include "vec/common/hex.h"
+#include "vec/common/sip_hash.h"
+
+namespace fs = std::filesystem;
+
+namespace doris {
+namespace io {
+
+IFileCache::IFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings)
+        : _cache_base_path(cache_base_path),
+          _max_size(cache_settings.max_size),
+          _max_element_size(cache_settings.max_elements),
+          _persistent_max_size(cache_settings.persistent_max_size),
+          _persistent_max_element_size(cache_settings.persistent_max_elements),
+          _max_file_segment_size(cache_settings.max_file_segment_size),
+          _max_query_cache_size(cache_settings.max_query_cache_size) {}
+
+std::string IFileCache::Key::to_string() const {
+    return vectorized::get_hex_uint_lowercase(key);
+}
+
+IFileCache::Key IFileCache::hash(const std::string& path) {
+    uint128_t key;
+    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&key));
+    return Key(key);
+}
+
+std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset,
+                                                bool is_persistent) const {
+    auto key_str = key.to_string();
+    std::string suffix = is_persistent ? "_persistent" : "";
+    return fs::path(_cache_base_path) / key_str / (std::to_string(offset) + suffix);
+}
+
+std::string IFileCache::get_path_in_local_cache(const Key& key) const {
+    auto key_str = key.to_string();
+    return fs::path(_cache_base_path) / key_str;
+}
+
+IFileCache::QueryContextHolderPtr IFileCache::get_query_context_holder(const TUniqueId& query_id) {
+    std::lock_guard cache_lock(_mutex);
+
+    if (!_enable_file_cache_query_limit) {
+        return {};
+    }
+
+    /// if enable_filesystem_query_cache_limit is true,
+    /// we create context query for current query.
+    auto context = get_or_set_query_context(query_id, cache_lock);
+    return std::make_unique<QueryContextHolder>(query_id, this, context);
+}
+
+IFileCache::QueryContextPtr IFileCache::get_query_context(const TUniqueId& query_id,
+                                                          std::lock_guard<std::mutex>& cache_lock) {
+    auto query_iter = _query_map.find(query_id);
+    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
+}
+
+void IFileCache::remove_query_context(const TUniqueId& query_id) {
+    std::lock_guard cache_lock(_mutex);
+    const auto& query_iter = _query_map.find(query_id);
+
+    if (query_iter != _query_map.end() && query_iter->second.unique()) {
+        _query_map.erase(query_iter);
+    }
+}
+
+IFileCache::QueryContextPtr IFileCache::get_or_set_query_context(
+        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
+    if (query_id.lo == 0 && query_id.hi == 0) {
+        return nullptr;
+    }
+
+    auto context = get_query_context(query_id, cache_lock);
+    if (context) {
+        return context;
+    }
+
+    auto query_context = std::make_shared<QueryContext>(_max_query_cache_size);
+    auto query_iter = _query_map.emplace(query_id, query_context).first;
+    return query_iter->second;
+}
+
+void IFileCache::QueryContext::remove(const Key& key, size_t offset, bool is_presistent,
+                                      size_t size, std::lock_guard<std::mutex>& cache_lock) {
+    auto record = records.find({key, offset, is_presistent});
+    DCHECK(record != records.end());
+    lru_queue.remove(record->second, cache_lock);
+    records.erase({key, offset, is_presistent});
+}
+
+void IFileCache::QueryContext::reserve(const Key& key, size_t offset, bool is_presistent,
+                                       size_t size, std::lock_guard<std::mutex>& cache_lock) {
+    auto queue_iter = lru_queue.add(key, offset, is_presistent, size, cache_lock);
+    records.insert({{key, offset, is_presistent}, queue_iter});
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h
new file mode 100644
index 0000000000..056acb690d
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.h
+// and modified by Doris
+
+#pragma once
+
+#include <list>
+#include <memory>
+#include <unordered_map>
+
+#include "common/config.h"
+#include "io/cache/block/block_file_cache_fwd.h"
+
+namespace doris {
+namespace io {
+class FileBlock;
+using FileBlockSPtr = std::shared_ptr<FileBlock>;
+using FileBlocks = std::list<FileBlockSPtr>;
+struct FileBlocksHolder;
+struct ReadSettings;
+
+/**
+ * Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
+ */
+class IFileCache {
+    friend class FileBlock;
+    friend struct FileBlocksHolder;
+
+public:
+    struct Key {
+        uint128_t key;
+        std::string to_string() const;
+
+        Key() = default;
+        explicit Key(const uint128_t& key_) : key(key_) {}
+
+        bool operator==(const Key& other) const { return key == other.key; }
+    };
+
+    IFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
+
+    virtual ~IFileCache() = default;
+
+    /// Restore cache from local filesystem.
+    virtual Status initialize() = 0;
+
+    virtual void remove_if_exists(const Key& key, bool is_persistent) = 0;
+
+    virtual void remove_if_releasable(bool is_persistent) = 0;
+
+    /// Cache capacity in bytes.
+    size_t capacity() const { return _max_size; }
+
+    static Key hash(const std::string& path);
+
+    std::string get_path_in_local_cache(const Key& key, size_t offset, bool is_persistent) const;
+
+    std::string get_path_in_local_cache(const Key& key) const;
+
+    const std::string& get_base_path() const { return _cache_base_path; }
+
+    virtual std::vector<std::string> try_get_cache_paths(const Key& key, bool is_persistent) = 0;
+
+    /**
+     * Given an `offset` and `size` representing [offset, offset + size) bytes interval,
+     * return list of cached non-overlapping non-empty
+     * file segments `[segment1, ..., segmentN]` which intersect with given interval.
+     *
+     * Segments in returned list are ordered in ascending order and represent a full contiguous
+     * interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
+     *
+     * As long as pointers to returned file segments are hold
+     * it is guaranteed that these file segments are not removed from cache.
+     */
+    virtual FileBlocksHolder get_or_set(const Key& key, size_t offset, size_t size,
+                                        bool is_persistent, const TUniqueId& query_id) = 0;
+
+    /// For debug.
+    virtual std::string dump_structure(const Key& key, bool is_persistent) = 0;
+
+    virtual size_t get_used_cache_size(bool is_persistent) const = 0;
+
+    virtual size_t get_file_segments_num(bool is_persistent) const = 0;
+
+    IFileCache& operator=(const IFileCache&) = delete;
+    IFileCache(const IFileCache&) = delete;
+
+protected:
+    std::string _cache_base_path;
+    size_t _max_size = 0;
+    size_t _max_element_size = 0;
+    size_t _persistent_max_size = 0;
+    size_t _persistent_max_element_size = 0;
+    size_t _max_file_segment_size = 0;
+    size_t _max_query_cache_size = 0;
+
+    bool _is_initialized = false;
+
+    mutable std::mutex _mutex;
+
+    virtual bool try_reserve(const Key& key, const TUniqueId& query_id, bool is_persistent,
+                             size_t offset, size_t size,
+                             std::lock_guard<std::mutex>& cache_lock) = 0;
+
+    virtual void remove(const Key& key, bool is_persistent, size_t offset,
+                        std::lock_guard<std::mutex>& cache_lock,
+                        std::lock_guard<std::mutex>& segment_lock) = 0;
+
+    class LRUQueue {
+    public:
+        struct FileKeyAndOffset {
+            Key key;
+            size_t offset;
+            size_t size;
+            bool is_persistent;
+
+            FileKeyAndOffset(const Key& key, size_t offset, size_t size, bool is_persistent)
+                    : key(key), offset(offset), size(size), is_persistent(is_persistent) {}
+        };
+
+        using Iterator = typename std::list<FileKeyAndOffset>::iterator;
+
+        size_t get_total_cache_size(std::lock_guard<std::mutex>& /* cache_lock */) const {
+            return cache_size;
+        }
+
+        size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */) const {
+            return queue.size();
+        }
+
+        Iterator add(const Key& key, size_t offset, bool is_persistent, size_t size,
+                     std::lock_guard<std::mutex>& cache_lock);
+
+        void remove(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);
+
+        void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);
+
+        std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;
+
+        bool contains(const Key& key, size_t offset, std::lock_guard<std::mutex>& cache_lock) const;
+
+        Iterator begin() { return queue.begin(); }
+
+        Iterator end() { return queue.end(); }
+
+        void remove_all(std::lock_guard<std::mutex>& cache_lock);
+
+    private:
+        std::list<FileKeyAndOffset> queue;
+        size_t cache_size = 0;
+    };
+
+    using AccessKeyAndOffset = std::tuple<Key, size_t, bool>;
+    struct KeyAndOffsetHash {
+        std::size_t operator()(const AccessKeyAndOffset& key) const {
+            return UInt128Hash()(std::get<0>(key).key) ^ std::hash<uint64_t>()(std::get<1>(key));
+        }
+    };
+
+    using AccessRecord =
+            std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
+
+    /// Used to track and control the cache access of each query.
+    /// Through it, we can realize the processing of different queries by the cache layer.
+    struct QueryContext {
+        LRUQueue lru_queue;
+        AccessRecord records;
+
+        size_t max_cache_size = 0;
+
+        QueryContext(size_t max_cache_size) : max_cache_size(max_cache_size) {}
+
+        void remove(const Key& key, size_t offset, bool is_presistent, size_t size,
+                    std::lock_guard<std::mutex>& cache_lock);
+
+        void reserve(const Key& key, size_t offset, bool is_presistent, size_t size,
+                     std::lock_guard<std::mutex>& cache_lock);
+
+        size_t get_max_cache_size() const { return max_cache_size; }
+
+        size_t get_cache_size(std::lock_guard<std::mutex>& cache_lock) const {
+            return lru_queue.get_total_cache_size(cache_lock);
+        }
+
+        LRUQueue& queue() { return lru_queue; }
+    };
+
+    using QueryContextPtr = std::shared_ptr<QueryContext>;
+    using QueryContextMap = std::unordered_map<TUniqueId, QueryContextPtr>;
+
+    QueryContextMap _query_map;
+
+    bool _enable_file_cache_query_limit = config::enable_file_cache_query_limit;
+
+    QueryContextPtr get_query_context(const TUniqueId& query_id, std::lock_guard<std::mutex>&);
+
+    void remove_query_context(const TUniqueId& query_id);
+
+    QueryContextPtr get_or_set_query_context(const TUniqueId& query_id,
+                                             std::lock_guard<std::mutex>&);
+
+public:
+    /// Save a query context information, and adopt different cache policies
+    /// for different queries through the context cache layer.
+    struct QueryContextHolder {
+        QueryContextHolder(const TUniqueId& query_id, IFileCache* cache, QueryContextPtr context)
+                : query_id(query_id), cache(cache), context(context) {}
+
+        QueryContextHolder& operator=(const QueryContextHolder&) = delete;
+        QueryContextHolder(const QueryContextHolder&) = delete;
+
+        ~QueryContextHolder() {
+            /// If only the query_map and the current holder hold the context_query,
+            /// the query has been completed and the query_context is released.
+            if (context) {
+                context.reset();
+                cache->remove_query_context(query_id);
+            }
+        }
+
+        const TUniqueId& query_id;
+        IFileCache* cache = nullptr;
+        QueryContextPtr context;
+    };
+    using QueryContextHolderPtr = std::unique_ptr<QueryContextHolder>;
+    QueryContextHolderPtr get_query_context_holder(const TUniqueId& query_id);
+};
+
+using CloudFileCachePtr = IFileCache*;
+
+struct KeyHash {
+    std::size_t operator()(const IFileCache::Key& k) const { return UInt128Hash()(k.key); }
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp b/be/src/io/cache/block/block_file_cache_factory.cpp
new file mode 100644
index 0000000000..be5d4a28b5
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache_factory.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.cpp
+// and modified by Doris
+
+#include "io/cache/block/block_file_cache_factory.h"
+
+#include <cstddef>
+
+#include "common/config.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_lru_file_cache.h"
+#include "io/fs/local_file_system.h"
+
+namespace doris {
+namespace io {
+
+FileCacheFactory& FileCacheFactory::instance() {
+    static FileCacheFactory ret;
+    return ret;
+}
+
+Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
+                                           const FileCacheSettings& file_cache_settings,
+                                           FileCacheType type) {
+    if (config::clear_file_cache) {
+        auto fs = global_local_filesystem();
+        bool res = false;
+        fs->exists(cache_base_path, &res);
+        if (res) {
+            fs->delete_directory(cache_base_path);
+            fs->create_directory(cache_base_path);
+        }
+    }
+
+    std::unique_ptr<IFileCache> cache =
+            std::make_unique<LRUFileCache>(cache_base_path, file_cache_settings);
+    RETURN_IF_ERROR(cache->initialize());
+    std::string file_cache_type;
+    switch (type) {
+    case NORMAL:
+        _caches.push_back(std::move(cache));
+        file_cache_type = "NORMAL";
+        break;
+    case DISPOSABLE:
+        _disposable_cache.push_back(std::move(cache));
+        file_cache_type = "DISPOSABLE";
+        break;
+    }
+    LOG(INFO) << "[FileCache] path: " << cache_base_path << " type: " << file_cache_type
+              << " normal_size: " << file_cache_settings.max_size
+              << " normal_element_size: " << file_cache_settings.max_elements
+              << " persistent_size: " << file_cache_settings.persistent_max_size
+              << " persistent_element_size: " << file_cache_settings.persistent_max_elements;
+    return Status::OK();
+}
+
+CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) {
+    return _caches[KeyHash()(key) % _caches.size()].get();
+}
+
+CloudFileCachePtr FileCacheFactory::get_disposable_cache(const IFileCache::Key& key) {
+    if (_disposable_cache.empty()) {
+        return nullptr;
+    }
+    return _disposable_cache[KeyHash()(key) % _caches.size()].get();
+}
+
+std::vector<IFileCache::QueryContextHolderPtr> FileCacheFactory::get_query_context_holders(
+        const TUniqueId& query_id) {
+    std::vector<IFileCache::QueryContextHolderPtr> holders;
+    for (const auto& cache : _caches) {
+        holders.push_back(cache->get_query_context_holder(query_id));
+    }
+    return holders;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache_factory.h b/be/src/io/cache/block/block_file_cache_factory.h
new file mode 100644
index 0000000000..142455b5f0
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache_factory.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
+// and modified by Doris
+
+#pragma once
+
+#include <vector>
+
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_fwd.h"
+#include "io/cache/block/block_file_cache_settings.h"
+namespace doris {
+namespace io {
+
+enum FileCacheType {
+    NORMAL,
+    DISPOSABLE,
+};
+/**
+ * Creates a FileCache object for cache_base_path.
+ */
+class FileCacheFactory {
+public:
+    static FileCacheFactory& instance();
+
+    Status create_file_cache(const std::string& cache_base_path,
+                             const FileCacheSettings& file_cache_settings, FileCacheType type);
+
+    CloudFileCachePtr get_by_path(const IFileCache::Key& key);
+    CloudFileCachePtr get_disposable_cache(const IFileCache::Key& key);
+    std::vector<IFileCache::QueryContextHolderPtr> get_query_context_holders(
+            const TUniqueId& query_id);
+    FileCacheFactory() = default;
+    FileCacheFactory& operator=(const FileCacheFactory&) = delete;
+    FileCacheFactory(const FileCacheFactory&) = delete;
+
+private:
+    std::vector<std::unique_ptr<IFileCache>> _caches;
+    std::vector<std::unique_ptr<IFileCache>> _disposable_cache;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/path.h b/be/src/io/cache/block/block_file_cache_fwd.h
similarity index 65%
copy from be/src/io/fs/path.h
copy to be/src/io/cache/block/block_file_cache_fwd.h
index 9832ea6322..d3d3438593 100644
--- a/be/src/io/fs/path.h
+++ b/be/src/io/cache/block/block_file_cache_fwd.h
@@ -14,19 +14,24 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache_fwd.h
+// and modified by Doris
 
 #pragma once
+#include <memory>
 
-#include <filesystem>
+#include "vec/common/uint128.h"
 
+static constexpr size_t GB = 1 * 1024 * 1024 * 1024;
+static constexpr size_t KB = 1024;
 namespace doris {
 namespace io {
 
-using Path = std::filesystem::path;
-
-inline Path operator/(Path&& lhs, const Path& rhs) {
-    return std::move(lhs /= rhs);
-}
+using uint128_t = vectorized::UInt128;
+using UInt128Hash = vectorized::UInt128Hash;
+static constexpr size_t REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS = 100 * 1024;
 
+struct FileCacheSettings;
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache_profile.cpp b/be/src/io/cache/block/block_file_cache_profile.cpp
new file mode 100644
index 0000000000..c133fc3b16
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache_profile.cpp
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/block/block_file_cache_profile.h"
+
+#include <memory>
+
+#include "http/http_common.h"
+
+namespace doris {
+namespace io {
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_total, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_hit_cache, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_bytes_read_total, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_bytes_read_from_file_cache,
+                                     MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_bytes_read_from_write_cache,
+                                     MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_written_in_file_cache,
+                                     MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_cache_num_io_bytes_written_in_file_cache,
+                                     MetricUnit::OPERATIONS);
+
+FileCacheStatistics FileCacheProfile::report(int64_t table_id, int64_t partition_id) {
+    FileCacheStatistics stats;
+    if (_profile.count(table_id) == 1 && _profile[table_id].count(partition_id) == 1) {
+        std::shared_ptr<AtomicStatistics> count;
+        {
+            std::lock_guard lock(_mtx);
+            count = _profile[table_id][partition_id];
+        }
+        stats.num_io_total = count->num_io_total.load(std::memory_order_relaxed);
+        stats.num_io_hit_cache = count->num_io_hit_cache.load(std::memory_order_relaxed);
+        stats.num_io_bytes_read_total =
+                count->num_io_bytes_read_total.load(std::memory_order_relaxed);
+        stats.num_io_bytes_read_from_file_cache =
+                count->num_io_bytes_read_from_file_cache.load(std::memory_order_relaxed);
+        stats.num_io_bytes_read_from_write_cache =
+                count->num_io_bytes_read_from_write_cache.load(std::memory_order_relaxed);
+        stats.num_io_written_in_file_cache =
+                count->num_io_written_in_file_cache.load(std::memory_order_relaxed);
+        stats.num_io_bytes_written_in_file_cache =
+                count->num_io_bytes_written_in_file_cache.load(std::memory_order_relaxed);
+    }
+    return stats;
+}
+
+FileCacheStatistics FileCacheProfile::report(int64_t table_id) {
+    FileCacheStatistics stats;
+    if (_profile.count(table_id) == 1) {
+        std::lock_guard lock(_mtx);
+        auto& partition_map = _profile[table_id];
+        for (auto& [partition_id, atomic_stats] : partition_map) {
+            stats.num_io_total += atomic_stats->num_io_total.load(std::memory_order_relaxed);
+            stats.num_io_hit_cache +=
+                    atomic_stats->num_io_hit_cache.load(std::memory_order_relaxed);
+            stats.num_io_bytes_read_total +=
+                    atomic_stats->num_io_bytes_read_total.load(std::memory_order_relaxed);
+            stats.num_io_bytes_read_from_file_cache +=
+                    atomic_stats->num_io_bytes_read_from_file_cache.load(std::memory_order_relaxed);
+            stats.num_io_bytes_read_from_write_cache +=
+                    atomic_stats->num_io_bytes_read_from_write_cache.load(
+                            std::memory_order_relaxed);
+            stats.num_io_written_in_file_cache +=
+                    atomic_stats->num_io_written_in_file_cache.load(std::memory_order_relaxed);
+            stats.num_io_bytes_written_in_file_cache +=
+                    atomic_stats->num_io_bytes_written_in_file_cache.load(
+                            std::memory_order_relaxed);
+        }
+    }
+    return stats;
+}
+
+void FileCacheProfile::update(int64_t table_id, int64_t partition_id, OlapReaderStatistics* stats) {
+    if (!s_enable_profile.load(std::memory_order_acquire)) {
+        return;
+    }
+    std::shared_ptr<AtomicStatistics> count;
+    std::shared_ptr<FileCacheMetric> partition_metric;
+    std::shared_ptr<FileCacheMetric> table_metric;
+    {
+        std::lock_guard lock(_mtx);
+        if (_profile.count(table_id) < 1 || _profile[table_id].count(partition_id) < 1) {
+            _profile[table_id][partition_id] = std::make_shared<AtomicStatistics>();
+            partition_metric = std::make_shared<FileCacheMetric>(table_id, partition_id, this);
+            _partition_metrics[table_id][partition_id] = partition_metric;
+            if (_table_metrics.count(table_id) < 1) {
+                table_metric = std::make_shared<FileCacheMetric>(table_id, this);
+                _table_metrics[table_id] = table_metric;
+            }
+        }
+        count = _profile[table_id][partition_id];
+    }
+    if (partition_metric) [[unlikely]] {
+        partition_metric->register_entity();
+    }
+    if (table_metric) [[unlikely]] {
+        table_metric->register_entity();
+    }
+    count->num_io_total.fetch_add(stats->file_cache_stats.num_io_total, std::memory_order_relaxed);
+    count->num_io_hit_cache.fetch_add(stats->file_cache_stats.num_io_hit_cache,
+                                      std::memory_order_relaxed);
+    count->num_io_bytes_read_total.fetch_add(stats->file_cache_stats.num_io_bytes_read_total,
+                                             std::memory_order_relaxed);
+    count->num_io_bytes_read_from_file_cache.fetch_add(
+            stats->file_cache_stats.num_io_bytes_read_from_file_cache, std::memory_order_relaxed);
+    count->num_io_bytes_read_from_write_cache.fetch_add(
+            stats->file_cache_stats.num_io_bytes_read_from_write_cache, std::memory_order_relaxed);
+    count->num_io_written_in_file_cache.fetch_add(
+            stats->file_cache_stats.num_io_written_in_file_cache, std::memory_order_relaxed);
+    count->num_io_bytes_written_in_file_cache.fetch_add(
+            stats->file_cache_stats.num_io_bytes_written_in_file_cache, std::memory_order_relaxed);
+}
+
+void FileCacheProfile::deregister_metric(int64_t table_id, int64_t partition_id) {
+    if (!s_enable_profile.load(std::memory_order_acquire)) {
+        return;
+    }
+    std::shared_ptr<FileCacheMetric> partition_metric;
+    std::shared_ptr<FileCacheMetric> table_metric;
+    {
+        std::lock_guard lock(_mtx);
+        partition_metric = _partition_metrics[table_id][partition_id];
+        _partition_metrics[table_id].erase(partition_id);
+        if (_partition_metrics[table_id].empty()) {
+            _partition_metrics.erase(table_id);
+            table_metric = _table_metrics[table_id];
+            _table_metrics.erase(table_id);
+        }
+        _profile[table_id].erase(partition_id);
+        if (_profile[table_id].empty()) {
+            _profile.erase(table_id);
+        }
+    }
+    partition_metric->deregister_entity();
+    if (table_metric) {
+        table_metric->deregister_entity();
+    }
+}
+
+void FileCacheMetric::register_entity() {
+    std::string name = "table_" + std::to_string(table_id);
+    if (partition_id != -1) {
+        name += "_partition_" + std::to_string(partition_id);
+    }
+    entity = DorisMetrics::instance()->metric_registry()->register_entity(
+            std::string("cloud_file_cache"), {{"name", name}});
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_total);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_hit_cache);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_bytes_read_total);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_bytes_read_from_file_cache);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_bytes_read_from_write_cache);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_written_in_file_cache);
+    INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, file_cache_num_io_bytes_written_in_file_cache);
+    entity->register_hook(name, std::bind(&FileCacheMetric::update_table_metrics, this));
+}
+
+void FileCacheMetric::update_table_metrics() const {
+    FileCacheStatistics stats = profile->report(table_id);
+    file_cache_num_io_total->set_value(stats.num_io_total);
+    file_cache_num_io_hit_cache->set_value(stats.num_io_hit_cache);
+    file_cache_num_io_bytes_read_total->set_value(stats.num_io_bytes_read_total);
+    file_cache_num_io_bytes_read_from_file_cache->set_value(
+            stats.num_io_bytes_read_from_file_cache);
+    file_cache_num_io_bytes_read_from_write_cache->set_value(
+            stats.num_io_bytes_read_from_write_cache);
+    file_cache_num_io_written_in_file_cache->set_value(stats.num_io_written_in_file_cache);
+    file_cache_num_io_bytes_written_in_file_cache->set_value(
+            stats.num_io_bytes_written_in_file_cache);
+}
+
+void FileCacheMetric::update_partition_metrics() const {
+    FileCacheStatistics stats = profile->report(table_id, partition_id);
+    file_cache_num_io_total->set_value(stats.num_io_total);
+    file_cache_num_io_hit_cache->set_value(stats.num_io_hit_cache);
+    file_cache_num_io_bytes_read_total->set_value(stats.num_io_bytes_read_total);
+    file_cache_num_io_bytes_read_from_file_cache->set_value(
+            stats.num_io_bytes_read_from_file_cache);
+    file_cache_num_io_bytes_read_from_write_cache->set_value(
+            stats.num_io_bytes_read_from_write_cache);
+    file_cache_num_io_written_in_file_cache->set_value(stats.num_io_written_in_file_cache);
+    file_cache_num_io_bytes_written_in_file_cache->set_value(
+            stats.num_io_bytes_written_in_file_cache);
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache_profile.h b/be/src/io/cache/block/block_file_cache_profile.h
new file mode 100644
index 0000000000..64ca4eee9b
--- /dev/null
+++ b/be/src/io/cache/block/block_file_cache_profile.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+#include <cstdlib>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+
+#include "olap/olap_common.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
+
+namespace doris {
+namespace io {
+
+struct AtomicStatistics {
+    std::atomic<int64_t> num_io_total = 0;
+    std::atomic<int64_t> num_io_hit_cache = 0;
+    std::atomic<int64_t> num_io_bytes_read_total = 0;
+    std::atomic<int64_t> num_io_bytes_read_from_file_cache = 0;
+    std::atomic<int64_t> num_io_bytes_read_from_write_cache = 0;
+    std::atomic<int64_t> num_io_written_in_file_cache = 0;
+    std::atomic<int64_t> num_io_bytes_written_in_file_cache = 0;
+};
+
+struct FileCacheProfile;
+
+struct FileCacheMetric {
+    FileCacheMetric(int64_t table_id, FileCacheProfile* profile)
+            : profile(profile), table_id(table_id) {}
+
+    FileCacheMetric(int64_t table_id, int64_t partition_id, FileCacheProfile* profile)
+            : profile(profile), table_id(table_id), partition_id(partition_id) {}
+
+    void register_entity();
+    void deregister_entity() const {
+        DorisMetrics::instance()->metric_registry()->deregister_entity(entity);
+    }
+    void update_table_metrics() const;
+    void update_partition_metrics() const;
+
+    FileCacheMetric& operator=(const FileCacheMetric&) = delete;
+    FileCacheMetric(const FileCacheMetric&) = delete;
+    FileCacheProfile* profile = nullptr;
+    int64_t table_id = -1;
+    int64_t partition_id = -1;
+    std::shared_ptr<MetricEntity> entity;
+    IntAtomicCounter* file_cache_num_io_total = nullptr;
+    IntAtomicCounter* file_cache_num_io_hit_cache = nullptr;
+    IntAtomicCounter* file_cache_num_io_bytes_read_total = nullptr;
+    IntAtomicCounter* file_cache_num_io_bytes_read_from_file_cache = nullptr;
+    IntAtomicCounter* file_cache_num_io_bytes_read_from_write_cache = nullptr;
+    IntAtomicCounter* file_cache_num_io_written_in_file_cache = nullptr;
+    IntAtomicCounter* file_cache_num_io_bytes_written_in_file_cache = nullptr;
+};
+
+struct FileCacheProfile {
+    static FileCacheProfile& instance() {
+        static FileCacheProfile s_profile;
+        return s_profile;
+    }
+
+    FileCacheProfile() {
+        OlapReaderStatistics stats;
+        update(0, 0, &stats);
+    }
+
+    // avoid performance impact, use https to control
+    inline static std::atomic<bool> s_enable_profile = true;
+
+    static void set_enable_profile(bool flag) {
+        // if enable_profile = false originally, set true, it will clear the count
+        if (!s_enable_profile && flag) {
+            std::lock_guard lock(instance()._mtx);
+            instance()._profile.clear();
+        }
+        s_enable_profile.store(flag, std::memory_order_release);
+    }
+
+    void update(int64_t table_id, int64_t partition_id, OlapReaderStatistics* stats);
+
+    void deregister_metric(int64_t table_id, int64_t partition_id);
+    std::mutex _mtx;
+    // use shared_ptr for concurrent
+    std::unordered_map<int64_t, std::unordered_map<int64_t, std::shared_ptr<AtomicStatistics>>>
+            _profile;
+    std::unordered_map<int64_t, std::shared_ptr<FileCacheMetric>> _table_metrics;
+    std::unordered_map<int64_t, std::unordered_map<int64_t, std::shared_ptr<FileCacheMetric>>>
+            _partition_metrics;
+    FileCacheStatistics report(int64_t table_id);
+    FileCacheStatistics report(int64_t table_id, int64_t partition_id);
+};
+
+} // namespace io
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/io/fs/path.h b/be/src/io/cache/block/block_file_cache_settings.h
similarity index 61%
copy from be/src/io/fs/path.h
copy to be/src/io/cache/block/block_file_cache_settings.h
index 9832ea6322..ed61ac8d19 100644
--- a/be/src/io/fs/path.h
+++ b/be/src/io/cache/block/block_file_cache_settings.h
@@ -14,19 +14,27 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheSettings.h
+// and modified by Doris
 
 #pragma once
 
-#include <filesystem>
+#include "io/cache/block/block_file_cache_fwd.h"
 
 namespace doris {
 namespace io {
 
-using Path = std::filesystem::path;
+struct FileCacheSettings {
+    size_t max_size = 0;
+    size_t max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS;
+    // use a priority policy to eliminate
+    size_t persistent_max_size = 0;
+    size_t persistent_max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS;
 
-inline Path operator/(Path&& lhs, const Path& rhs) {
-    return std::move(lhs /= rhs);
-}
+    size_t max_file_segment_size = 0;
+    size_t max_query_cache_size = 0;
+};
 
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/block/block_file_segment.cpp b/be/src/io/cache/block/block_file_segment.cpp
new file mode 100644
index 0000000000..ae98270897
--- /dev/null
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileSegment.cpp
+// and modified by Doris
+
+#include "io/cache/block/block_file_segment.h"
+
+#include <filesystem>
+#include <sstream>
+#include <string>
+#include <thread>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "olap/iterators.h"
+#include "vec/common/hex.h"
+
+namespace doris {
+namespace io {
+
+FileBlock::FileBlock(size_t offset_, size_t size_, const Key& key_, IFileCache* cache_,
+                     State download_state_, bool is_persistent)
+        : _segment_range(offset_, offset_ + size_ - 1),
+          _download_state(download_state_),
+          _file_key(key_),
+          _cache(cache_),
+          _is_persistent(is_persistent) {
+    /// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
+    switch (_download_state) {
+    /// EMPTY is used when file segment is not in cache and
+    /// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
+    case State::EMPTY:
+    case State::SKIP_CACHE: {
+        break;
+    }
+    /// DOWNLOADED is used either on initial cache metadata load into memory on server startup
+    /// or on reduceSizeToDownloaded() -- when file segment object is updated.
+    case State::DOWNLOADED: {
+        _downloaded_size = size_;
+        break;
+    }
+    /// DOWNLOADING is used only for write-through caching (e.g. getOrSetDownloader() is not
+    /// needed, downloader is set on file segment creation).
+    case State::DOWNLOADING: {
+        _downloader_id = get_caller_id();
+        break;
+    }
+    default: {
+        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING, SKIP_CACHE ";
+    }
+    }
+}
+
+FileBlock::State FileBlock::state() const {
+    std::lock_guard segment_lock(_mutex);
+    return _download_state;
+}
+
+size_t FileBlock::get_download_offset() const {
+    std::lock_guard segment_lock(_mutex);
+    return range().left + get_downloaded_size(segment_lock);
+}
+
+size_t FileBlock::get_downloaded_size() const {
+    std::lock_guard segment_lock(_mutex);
+    return get_downloaded_size(segment_lock);
+}
+
+size_t FileBlock::get_downloaded_size(std::lock_guard<std::mutex>& /* segment_lock */) const {
+    if (_download_state == State::DOWNLOADED) {
+        return _downloaded_size;
+    }
+
+    std::lock_guard download_lock(_download_mutex);
+    return _downloaded_size;
+}
+
+std::string FileBlock::get_caller_id() {
+    std::stringstream ss;
+    ss << std::this_thread::get_id();
+    return ss.str();
+}
+
+std::string FileBlock::get_or_set_downloader() {
+    std::lock_guard segment_lock(_mutex);
+
+    if (_downloader_id.empty()) {
+        DCHECK(_download_state != State::DOWNLOADING);
+
+        _downloader_id = get_caller_id();
+        _download_state = State::DOWNLOADING;
+    } else if (_downloader_id == get_caller_id()) {
+        LOG(INFO) << "Attempt to set the same downloader for segment " << range().to_string()
+                  << " for the second time";
+    }
+
+    return _downloader_id;
+}
+
+void FileBlock::reset_downloader(std::lock_guard<std::mutex>& segment_lock) {
+    DCHECK(!_downloader_id.empty()) << "There is no downloader";
+
+    DCHECK(get_caller_id() == _downloader_id) << "Downloader can be reset only by downloader";
+
+    reset_downloader_impl(segment_lock);
+}
+
+void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& segment_lock) {
+    if (_downloaded_size == range().size()) {
+        set_downloaded(segment_lock);
+    } else {
+        _downloaded_size = 0;
+        _download_state = State::EMPTY;
+        _downloader_id.clear();
+    }
+}
+
+std::string FileBlock::get_downloader() const {
+    std::lock_guard segment_lock(_mutex);
+    return _downloader_id;
+}
+
+bool FileBlock::is_downloader() const {
+    std::lock_guard segment_lock(_mutex);
+    return get_caller_id() == _downloader_id;
+}
+
+bool FileBlock::is_downloader_impl(std::lock_guard<std::mutex>& /* segment_lock */) const {
+    return get_caller_id() == _downloader_id;
+}
+
+Status FileBlock::append(Slice data) {
+    DCHECK(data.size != 0) << "Writing zero size is not allowed";
+
+    if (!_cache_writer) {
+        auto download_path = get_path_in_local_cache();
+        RETURN_IF_ERROR(global_local_filesystem()->create_file(download_path, &_cache_writer));
+    }
+
+    RETURN_IF_ERROR(_cache_writer->append(data));
+
+    std::lock_guard download_lock(_download_mutex);
+
+    _downloaded_size += data.size;
+    return Status::OK();
+}
+
+std::string FileBlock::get_path_in_local_cache() const {
+    return _cache->get_path_in_local_cache(key(), offset(), _is_persistent);
+}
+
+Status FileBlock::read_at(Slice buffer, size_t offset) {
+    if (!_cache_reader) {
+        std::lock_guard segment_lock(_mutex);
+        if (!_cache_reader) {
+            auto download_path = get_path_in_local_cache();
+            RETURN_IF_ERROR(
+                    global_local_filesystem()->open_file(download_path, &_cache_reader, nullptr));
+        }
+    }
+    size_t bytes_reads = buffer.size;
+    IOContext io_ctx;
+    RETURN_IF_ERROR(_cache_reader->read_at(offset, buffer, io_ctx, &bytes_reads));
+    DCHECK(bytes_reads == buffer.size);
+    return Status::OK();
+}
+
+Status FileBlock::finalize_write() {
+    std::lock_guard segment_lock(_mutex);
+
+    RETURN_IF_ERROR(set_downloaded(segment_lock));
+    _cv.notify_all();
+    return Status::OK();
+}
+
+FileBlock::State FileBlock::wait() {
+    std::unique_lock segment_lock(_mutex);
+
+    if (_downloader_id.empty()) {
+        return _download_state;
+    }
+
+    if (_download_state == State::DOWNLOADING) {
+        DCHECK(!_downloader_id.empty());
+        DCHECK(_downloader_id != get_caller_id());
+
+        _cv.wait_for(segment_lock, std::chrono::seconds(1));
+    }
+
+    return _download_state;
+}
+
+Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* segment_lock */) {
+    if (_is_downloaded) {
+        return Status::OK();
+    }
+
+    if (_cache_writer) {
+        RETURN_IF_ERROR(_cache_writer->close());
+        _cache_writer.reset();
+    }
+
+    _download_state = State::DOWNLOADED;
+    _is_downloaded = true;
+    _downloader_id.clear();
+    return Status::OK();
+}
+
+void FileBlock::complete(std::lock_guard<std::mutex>& cache_lock) {
+    std::lock_guard segment_lock(_mutex);
+
+    complete_unlocked(cache_lock, segment_lock);
+}
+
+void FileBlock::complete_unlocked(std::lock_guard<std::mutex>& cache_lock,
+                                  std::lock_guard<std::mutex>& segment_lock) {
+    if (is_downloader_impl(segment_lock)) {
+        reset_downloader(segment_lock);
+        _cv.notify_all();
+    }
+}
+
+std::string FileBlock::get_info_for_log() const {
+    std::lock_guard segment_lock(_mutex);
+    return get_info_for_log_impl(segment_lock);
+}
+
+std::string FileBlock::get_info_for_log_impl(std::lock_guard<std::mutex>& segment_lock) const {
+    std::stringstream info;
+    info << "File segment: " << range().to_string() << ", ";
+    info << "state: " << state_to_string(_download_state) << ", ";
+    info << "downloaded size: " << get_downloaded_size(segment_lock) << ", ";
+    info << "downloader id: " << _downloader_id << ", ";
+    info << "caller id: " << get_caller_id();
+
+    return info.str();
+}
+
+std::string FileBlock::state_to_string(FileBlock::State state) {
+    switch (state) {
+    case FileBlock::State::DOWNLOADED:
+        return "DOWNLOADED";
+    case FileBlock::State::EMPTY:
+        return "EMPTY";
+    case FileBlock::State::DOWNLOADING:
+        return "DOWNLOADING";
+    case FileBlock::State::SKIP_CACHE:
+        return "SKIP_CACHE";
+    default:
+        DCHECK(false);
+        return "";
+    }
+}
+
+bool FileBlock::has_finalized_state() const {
+    return _download_state == State::DOWNLOADED;
+}
+
+FileBlock::~FileBlock() {
+    std::lock_guard segment_lock(_mutex);
+}
+
+FileBlocksHolder::~FileBlocksHolder() {
+    /// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
+    /// FileBlocksHolder right after calling file_segment->complete(), so on destruction here
+    /// remain only uncompleted file segments.
+
+    IFileCache* cache = nullptr;
+
+    for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();) {
+        auto current_file_segment_it = file_segment_it;
+        auto& file_segment = *current_file_segment_it;
+
+        if (!cache) {
+            cache = file_segment->_cache;
+        }
+
+        /// File segment pointer must be reset right after calling complete() and
+        /// under the same mutex, because complete() checks for segment pointers.
+        std::lock_guard cache_lock(cache->_mutex);
+
+        file_segment->complete(cache_lock);
+
+        file_segment_it = file_segments.erase(current_file_segment_it);
+    }
+}
+
+std::string FileBlocksHolder::to_string() {
+    std::string ranges;
+    for (const auto& file_segment : file_segments) {
+        if (!ranges.empty()) {
+            ranges += ", ";
+        }
+        ranges += file_segment->range().to_string();
+    }
+    return ranges;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_segment.h b/be/src/io/cache/block/block_file_segment.h
new file mode 100644
index 0000000000..47732cb70e
--- /dev/null
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileSegment.h
+// and modified by Doris
+
+#pragma once
+
+#include <condition_variable>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class FileBlock;
+using FileBlockSPtr = std::shared_ptr<FileBlock>;
+using FileBlocks = std::list<FileBlockSPtr>;
+
+class FileBlock {
+    friend class LRUFileCache;
+    friend struct FileBlocksHolder;
+
+public:
+    using Key = IFileCache::Key;
+    using LocalWriterPtr = std::unique_ptr<FileWriter>;
+    using LocalReaderPtr = std::shared_ptr<FileReader>;
+
+    enum class State {
+        DOWNLOADED,
+        /**
+         * When file segment is first created and returned to user, it has state EMPTY.
+         * EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
+         * by any owner of EMPTY state file segment.
+         */
+        EMPTY,
+        /**
+         * A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
+         * because each cache user might acquire multiple file segments and reads them one by one,
+         * so only user which actually needs to read this segment earlier than others - becomes a downloader.
+         */
+        DOWNLOADING,
+        SKIP_CACHE,
+    };
+
+    FileBlock(size_t offset_, size_t size_, const Key& key_, IFileCache* cache_,
+              State download_state_, bool is_persistent);
+
+    ~FileBlock();
+
+    State state() const;
+
+    static std::string state_to_string(FileBlock::State state);
+
+    /// Represents an interval [left, right] including both boundaries.
+    struct Range {
+        size_t left;
+        size_t right;
+
+        Range(size_t left_, size_t right_) : left(left_), right(right_) {}
+
+        bool operator==(const Range& other) const {
+            return left == other.left && right == other.right;
+        }
+
+        size_t size() const { return right - left + 1; }
+
+        std::string to_string() const {
+            return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right));
+        }
+    };
+
+    const Range& range() const { return _segment_range; }
+
+    const Key& key() const { return _file_key; }
+
+    size_t offset() const { return range().left; }
+
+    State wait();
+
+    // append data to cache file
+    Status append(Slice data);
+
+    // read data from cache file
+    Status read_at(Slice buffer, size_t offset_);
+
+    // finish write, release the file writer
+    Status finalize_write();
+
+    // set downloader if state == EMPTY
+    std::string get_or_set_downloader();
+
+    std::string get_downloader() const;
+
+    void reset_downloader(std::lock_guard<std::mutex>& segment_lock);
+
+    bool is_downloader() const;
+
+    bool is_downloaded() const { return _is_downloaded.load(); }
+
+    bool is_persistent() const { return _is_persistent; }
+
+    static std::string get_caller_id();
+
+    size_t get_download_offset() const;
+
+    size_t get_downloaded_size() const;
+
+    std::string get_info_for_log() const;
+
+    std::string get_path_in_local_cache() const;
+
+    FileBlock& operator=(const FileBlock&) = delete;
+    FileBlock(const FileBlock&) = delete;
+
+private:
+    size_t get_downloaded_size(std::lock_guard<std::mutex>& segment_lock) const;
+    std::string get_info_for_log_impl(std::lock_guard<std::mutex>& segment_lock) const;
+    bool has_finalized_state() const;
+
+    Status set_downloaded(std::lock_guard<std::mutex>& segment_lock);
+    bool is_downloader_impl(std::lock_guard<std::mutex>& segment_lock) const;
+
+    /// complete() without any completion state is called from destructor of
+    /// FileBlocksHolder. complete() might check if the caller of the method
+    /// is the last alive holder of the segment. Therefore, complete() and destruction
+    /// of the file segment pointer must be done under the same cache mutex.
+    void complete(std::lock_guard<std::mutex>& cache_lock);
+    void complete_unlocked(std::lock_guard<std::mutex>& cache_lock,
+                           std::lock_guard<std::mutex>& segment_lock);
+
+    void reset_downloader_impl(std::lock_guard<std::mutex>& segment_lock);
+
+    const Range _segment_range;
+
+    State _download_state;
+
+    std::string _downloader_id;
+
+    LocalWriterPtr _cache_writer;
+    LocalReaderPtr _cache_reader;
+
+    size_t _downloaded_size = 0;
+
+    /// global locking order rule:
+    /// 1. cache lock
+    /// 2. segment lock
+
+    mutable std::mutex _mutex;
+    std::condition_variable _cv;
+
+    /// Protects downloaded_size access with actual write into fs.
+    /// downloaded_size is not protected by download_mutex in methods which
+    /// can never be run in parallel to FileBlock::write() method
+    /// as downloaded_size is updated only in FileBlock::write() method.
+    /// Such methods are identified by isDownloader() check at their start,
+    /// e.g. they are executed strictly by the same thread, sequentially.
+    mutable std::mutex _download_mutex;
+
+    Key _file_key;
+    IFileCache* _cache;
+
+    std::atomic<bool> _is_downloaded {false};
+    bool _is_persistent = false;
+};
+
+struct FileBlocksHolder {
+    explicit FileBlocksHolder(FileBlocks&& file_segments_)
+            : file_segments(std::move(file_segments_)) {}
+
+    FileBlocksHolder(FileBlocksHolder&& other) noexcept
+            : file_segments(std::move(other.file_segments)) {}
+
+    FileBlocksHolder& operator=(const FileBlocksHolder&) = delete;
+    FileBlocksHolder(const FileBlocksHolder&) = delete;
+
+    ~FileBlocksHolder();
+
+    FileBlocks file_segments {};
+
+    std::string to_string();
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp
new file mode 100644
index 0000000000..d925c77e57
--- /dev/null
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -0,0 +1,844 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/LRUFileCachePriority.cpp
+// and modified by Doris
+
+#include "io/cache/block/block_lru_file_cache.h"
+
+#include <filesystem>
+#include <random>
+#include <system_error>
+#include <utility>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_settings.h"
+#include "util/time.h"
+#include "vec/common/hex.h"
+#include "vec/common/sip_hash.h"
+
+namespace fs = std::filesystem;
+
+namespace doris {
+namespace io {
+
+LRUFileCache::LRUFileCache(const std::string& cache_base_path_,
+                           const FileCacheSettings& cache_settings_)
+        : IFileCache(cache_base_path_, cache_settings_) {}
+
+Status LRUFileCache::initialize() {
+    std::lock_guard cache_lock(_mutex);
+    if (!_is_initialized) {
+        if (fs::exists(_cache_base_path)) {
+            load_cache_info_into_memory(cache_lock);
+        } else {
+            std::error_code ec;
+            fs::create_directories(_cache_base_path, ec);
+            if (ec) {
+                return Status::IOError("cannot create {}: {}", _cache_base_path,
+                                       std::strerror(ec.value()));
+            }
+        }
+    }
+    _is_initialized = true;
+    return Status::OK();
+}
+
+void LRUFileCache::use_cell(const FileBlockCell& cell, const TUniqueId& query_id,
+                            bool is_persistent, FileBlocks& result,
+                            std::lock_guard<std::mutex>& cache_lock) {
+    auto file_segment = cell.file_segment;
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    DCHECK(!(file_segment->is_downloaded() &&
+             fs::file_size(get_path_in_local_cache(file_segment->key(), file_segment->offset(),
+                                                   is_persistent)) == 0))
+            << "Cannot have zero size downloaded file segments. Current file segment: "
+            << file_segment->range().to_string();
+
+    result.push_back(cell.file_segment);
+
+    DCHECK(cell.queue_iterator);
+    /// Move to the end of the queue. The iterator remains valid.
+    queue->move_to_end(*cell.queue_iterator, cache_lock);
+}
+
+LRUFileCache::FileBlockCell* LRUFileCache::get_cell(const Key& key, bool is_persistent,
+                                                    size_t offset,
+                                                    std::lock_guard<std::mutex>& /* cache_lock */) {
+    auto it = _files.find(std::make_pair(key, is_persistent));
+    if (it == _files.end()) {
+        return nullptr;
+    }
+
+    auto& offsets = it->second;
+    auto cell_it = offsets.find(offset);
+    if (cell_it == offsets.end()) {
+        return nullptr;
+    }
+
+    return &cell_it->second;
+}
+
+FileBlocks LRUFileCache::get_impl(const Key& key, const TUniqueId& query_id, bool is_persistent,
+                                  const FileBlock::Range& range,
+                                  std::lock_guard<std::mutex>& cache_lock) {
+    /// Given range = [left, right] and non-overlapping ordered set of file segments,
+    /// find list [segment1, ..., segmentN] of segments which intersect with given range.
+    auto file_key = std::make_pair(key, is_persistent);
+    auto it = _files.find(file_key);
+    if (it == _files.end()) {
+        return {};
+    }
+
+    const auto& file_segments = it->second;
+    if (file_segments.empty()) {
+        auto key_path = get_path_in_local_cache(key);
+
+        _files.erase(file_key);
+
+        /// Note: it is guaranteed that there is no concurrency with files deletion,
+        /// because cache files are deleted only inside IFileCache and under cache lock.
+        if (fs::exists(key_path)) {
+            std::error_code ec;
+            fs::remove_all(key_path, ec);
+            if (ec) {
+                LOG(WARNING) << ec.message();
+            }
+        }
+
+        return {};
+    }
+
+    FileBlocks result;
+    auto segment_it = file_segments.lower_bound(range.left);
+    if (segment_it == file_segments.end()) {
+        /// N - last cached segment for given file key, segment{N}.offset < range.left:
+        ///   segment{N}                       segment{N}
+        /// [________                         [_______]
+        ///     [__________]         OR                  [________]
+        ///     ^                                        ^
+        ///     range.left                               range.left
+
+        const auto& cell = file_segments.rbegin()->second;
+        if (cell.file_segment->range().right < range.left) {
+            return {};
+        }
+
+        use_cell(cell, query_id, is_persistent, result, cache_lock);
+    } else { /// segment_it <-- segmment{k}
+        if (segment_it != file_segments.begin()) {
+            const auto& prev_cell = std::prev(segment_it)->second;
+            const auto& prev_cell_range = prev_cell.file_segment->range();
+
+            if (range.left <= prev_cell_range.right) {
+                ///   segment{k-1}  segment{k}
+                ///   [________]   [_____
+                ///       [___________
+                ///       ^
+                ///       range.left
+
+                use_cell(prev_cell, query_id, is_persistent, result, cache_lock);
+            }
+        }
+
+        ///  segment{k} ...       segment{k-1}  segment{k}                      segment{k}
+        ///  [______              [______]     [____                        [________
+        ///  [_________     OR              [________      OR    [______]   ^
+        ///  ^                              ^                           ^   segment{k}.offset
+        ///  range.left                     range.left                  range.right
+
+        while (segment_it != file_segments.end()) {
+            const auto& cell = segment_it->second;
+            if (range.right < cell.file_segment->range().left) {
+                break;
+            }
+
+            use_cell(cell, query_id, is_persistent, result, cache_lock);
+            ++segment_it;
+        }
+    }
+
+    return result;
+}
+
+FileBlocks LRUFileCache::split_range_into_cells(const Key& key, const TUniqueId& query_id,
+                                                bool is_persistent, size_t offset, size_t size,
+                                                FileBlock::State state,
+                                                std::lock_guard<std::mutex>& cache_lock) {
+    DCHECK(size > 0);
+
+    auto current_pos = offset;
+    auto end_pos_non_included = offset + size;
+
+    size_t current_size = 0;
+    size_t remaining_size = size;
+
+    FileBlocks file_segments;
+    while (current_pos < end_pos_non_included) {
+        current_size = std::min(remaining_size, _max_file_segment_size);
+        remaining_size -= current_size;
+        state = try_reserve(key, query_id, is_persistent, current_pos, current_size, cache_lock)
+                        ? state
+                        : FileBlock::State::SKIP_CACHE;
+        if (UNLIKELY(state == FileBlock::State::SKIP_CACHE)) {
+            auto file_segment =
+                    std::make_shared<FileBlock>(current_pos, current_size, key, this,
+                                                FileBlock::State::SKIP_CACHE, is_persistent);
+            file_segments.push_back(std::move(file_segment));
+        } else {
+            auto* cell = add_cell(key, is_persistent, current_pos, current_size, state, cache_lock);
+            if (cell) {
+                file_segments.push_back(cell->file_segment);
+            }
+        }
+
+        current_pos += current_size;
+    }
+
+    DCHECK(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
+    return file_segments;
+}
+
+void LRUFileCache::fill_holes_with_empty_file_segments(FileBlocks& file_segments, const Key& key,
+                                                       const TUniqueId& query_id,
+                                                       bool is_persistent,
+                                                       const FileBlock::Range& range,
+                                                       std::lock_guard<std::mutex>& cache_lock) {
+    /// There are segments [segment1, ..., segmentN]
+    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
+    /// intersect with given range.
+
+    /// It can have holes:
+    /// [____________________]         -- requested range
+    ///     [____]  [_]   [_________]  -- intersecting cache [segment1, ..., segmentN]
+    ///
+    /// For each such hole create a cell with file segment state EMPTY.
+
+    auto it = file_segments.begin();
+    auto segment_range = (*it)->range();
+
+    size_t current_pos;
+    if (segment_range.left < range.left) {
+        ///    [_______     -- requested range
+        /// [_______
+        /// ^
+        /// segment1
+
+        current_pos = segment_range.right + 1;
+        ++it;
+    } else {
+        current_pos = range.left;
+    }
+
+    while (current_pos <= range.right && it != file_segments.end()) {
+        segment_range = (*it)->range();
+
+        if (current_pos == segment_range.left) {
+            current_pos = segment_range.right + 1;
+            ++it;
+            continue;
+        }
+
+        DCHECK(current_pos < segment_range.left);
+
+        auto hole_size = segment_range.left - current_pos;
+
+        file_segments.splice(
+                it, split_range_into_cells(key, query_id, is_persistent, current_pos, hole_size,
+                                           FileBlock::State::EMPTY, cache_lock));
+
+        current_pos = segment_range.right + 1;
+        ++it;
+    }
+
+    if (current_pos <= range.right) {
+        ///   ________]     -- requested range
+        ///   _____]
+        ///        ^
+        /// segmentN
+
+        auto hole_size = range.right - current_pos + 1;
+
+        file_segments.splice(
+                file_segments.end(),
+                split_range_into_cells(key, query_id, is_persistent, current_pos, hole_size,
+                                       FileBlock::State::EMPTY, cache_lock));
+    }
+}
+
+FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size,
+                                          bool is_persistent, const TUniqueId& query_id) {
+    FileBlock::Range range(offset, offset + size - 1);
+
+    std::lock_guard cache_lock(_mutex);
+
+    /// Get all segments which intersect with the given range.
+    auto file_segments = get_impl(key, query_id, is_persistent, range, cache_lock);
+
+    if (file_segments.empty()) {
+        file_segments = split_range_into_cells(key, query_id, is_persistent, offset, size,
+                                               FileBlock::State::EMPTY, cache_lock);
+    } else {
+        fill_holes_with_empty_file_segments(file_segments, key, query_id, is_persistent, range,
+                                            cache_lock);
+    }
+
+    DCHECK(!file_segments.empty());
+    return FileBlocksHolder(std::move(file_segments));
+}
+
+LRUFileCache::FileBlockCell* LRUFileCache::add_cell(const Key& key, bool is_persistent,
+                                                    size_t offset, size_t size,
+                                                    FileBlock::State state,
+                                                    std::lock_guard<std::mutex>& cache_lock) {
+    /// Create a file segment cell and put it in `files` map by [key][offset].
+    if (size == 0) {
+        return nullptr; /// Empty files are not cached.
+    }
+    auto file_key = std::make_pair(key, is_persistent);
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    DCHECK(_files[file_key].count(offset) == 0)
+            << "Cache already exists for key: " << key.to_string() << ", offset: " << offset
+            << ", size: " << size << ".\nCurrent cache structure: "
+            << dump_structure_unlocked(key, is_persistent, cache_lock);
+
+    auto& offsets = _files[file_key];
+    if (offsets.empty()) {
+        auto key_path = get_path_in_local_cache(key);
+        if (!fs::exists(key_path)) {
+            std::error_code ec;
+            fs::create_directories(key_path, ec);
+            if (ec) {
+                LOG(WARNING) << fmt::format("cannot create {}: {}", key_path,
+                                            std::strerror(ec.value()));
+                state = FileBlock::State::SKIP_CACHE;
+            }
+        }
+    }
+
+    FileBlockCell cell(std::make_shared<FileBlock>(offset, size, key, this, state, is_persistent),
+                       this, cache_lock);
+
+    cell.queue_iterator = queue->add(key, offset, is_persistent, size, cache_lock);
+    auto [it, inserted] = offsets.insert({offset, std::move(cell)});
+
+    DCHECK(inserted) << "Failed to insert into cache key: " << key.to_string()
+                     << ", offset: " << offset << ", size: " << size;
+
+    return &(it->second);
+}
+
+bool LRUFileCache::try_reserve(const Key& key, const TUniqueId& query_id, bool is_persistent,
+                               size_t offset, size_t size,
+                               std::lock_guard<std::mutex>& cache_lock) {
+    auto query_context = _enable_file_cache_query_limit && (query_id.hi != 0 || query_id.lo != 0)
+                                 ? get_query_context(query_id, cache_lock)
+                                 : nullptr;
+    if (!query_context) {
+        return try_reserve_for_main_list(key, nullptr, is_persistent, offset, size, cache_lock);
+    } else if (query_context->get_cache_size(cache_lock) + size <=
+               query_context->get_max_cache_size()) {
+        return try_reserve_for_main_list(key, query_context, is_persistent, offset, size,
+                                         cache_lock);
+    }
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    size_t removed_size = 0;
+    size_t queue_size = queue->get_elements_num(cache_lock);
+
+    std::vector<IFileCache::LRUQueue::Iterator> ghost;
+    std::vector<FileBlockCell*> trash;
+    std::vector<FileBlockCell*> to_evict;
+
+    size_t max_size = is_persistent ? _persistent_max_size : _max_size;
+    size_t max_element_size = is_persistent ? _persistent_max_element_size : _max_element_size;
+    auto is_overflow = [&] {
+        return (queue->get_total_cache_size(cache_lock) + size - removed_size > max_size) ||
+               queue_size > max_element_size ||
+               (query_context->get_cache_size(cache_lock) + size - removed_size >
+                query_context->get_max_cache_size());
+    };
+
+    /// Select the cache from the LRU queue held by query for expulsion.
+    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
+        if (!is_overflow()) {
+            break;
+        }
+
+        auto* cell = get_cell(iter->key, iter->is_persistent, iter->offset, cache_lock);
+
+        if (!cell) {
+            /// The cache corresponding to this record may be swapped out by
+            /// other queries, so it has become invalid.
+            ghost.push_back(iter);
+            removed_size += iter->size;
+        } else {
+            size_t cell_size = cell->size();
+            DCHECK(iter->size == cell_size);
+
+            if (cell->releasable()) {
+                auto& file_segment = cell->file_segment;
+                std::lock_guard segment_lock(file_segment->_mutex);
+
+                switch (file_segment->_download_state) {
+                case FileBlock::State::DOWNLOADED: {
+                    to_evict.push_back(cell);
+                    break;
+                }
+                default: {
+                    trash.push_back(cell);
+                    break;
+                }
+                }
+                removed_size += cell_size;
+                --queue_size;
+            }
+        }
+    }
+
+    auto remove_file_segment_if = [&](FileBlockCell* cell) {
+        FileBlockSPtr file_segment = cell->file_segment;
+        if (file_segment) {
+            size_t file_segment_size = cell->size();
+            query_context->remove(file_segment->key(), file_segment->offset(),
+                                  file_segment->is_persistent(), file_segment_size, cache_lock);
+
+            std::lock_guard segment_lock(file_segment->_mutex);
+            remove(file_segment->key(), file_segment->is_persistent(), file_segment->offset(),
+                   cache_lock, segment_lock);
+        }
+    };
+
+    for (auto& iter : ghost) {
+        query_context->remove(iter->key, iter->offset, iter->is_persistent, iter->size, cache_lock);
+    }
+
+    std::for_each(trash.begin(), trash.end(), remove_file_segment_if);
+    std::for_each(to_evict.begin(), to_evict.end(), remove_file_segment_if);
+
+    if (is_overflow()) {
+        return false;
+    }
+
+    query_context->reserve(key, offset, is_persistent, size, cache_lock);
+    return true;
+}
+
+bool LRUFileCache::try_reserve_for_main_list(const Key& key, QueryContextPtr query_context,
+                                             bool is_persistent, size_t offset, size_t size,
+                                             std::lock_guard<std::mutex>& cache_lock) {
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    auto removed_size = 0;
+    size_t queue_size = queue->get_elements_num(cache_lock);
+
+    size_t max_size = is_persistent ? _persistent_max_size : _max_size;
+    size_t max_element_size = is_persistent ? _persistent_max_element_size : _max_element_size;
+    auto is_overflow = [&] {
+        return (queue->get_total_cache_size(cache_lock) + size - removed_size > max_size) ||
+               queue_size >= max_element_size;
+    };
+
+    std::vector<FileBlockCell*> to_evict;
+    std::vector<FileBlockCell*> trash;
+
+    for (const auto& [entry_key, entry_offset, entry_size, _] : *queue) {
+        if (!is_overflow()) {
+            break;
+        }
+        auto* cell = get_cell(entry_key, is_persistent, entry_offset, cache_lock);
+
+        DCHECK(cell) << "Cache became inconsistent. Key: " << key.to_string()
+                     << ", offset: " << offset;
+
+        size_t cell_size = cell->size();
+        DCHECK(entry_size == cell_size);
+
+        /// It is guaranteed that cell is not removed from cache as long as
+        /// pointer to corresponding file segment is hold by any other thread.
+
+        if (cell->releasable()) {
+            auto& file_segment = cell->file_segment;
+
+            std::lock_guard segment_lock(file_segment->_mutex);
+
+            switch (file_segment->_download_state) {
+            case FileBlock::State::DOWNLOADED: {
+                /// Cell will actually be removed only if
+                /// we managed to reserve enough space.
+
+                to_evict.push_back(cell);
+                break;
+            }
+            default: {
+                trash.push_back(cell);
+                break;
+            }
+            }
+
+            removed_size += cell_size;
+            --queue_size;
+        }
+    }
+
+    auto remove_file_segment_if = [&](FileBlockCell* cell) {
+        FileBlockSPtr file_segment = cell->file_segment;
+        if (file_segment) {
+            std::lock_guard segment_lock(file_segment->_mutex);
+            remove(file_segment->key(), file_segment->is_persistent(), file_segment->offset(),
+                   cache_lock, segment_lock);
+        }
+    };
+
+    std::for_each(trash.begin(), trash.end(), remove_file_segment_if);
+    std::for_each(to_evict.begin(), to_evict.end(), remove_file_segment_if);
+
+    if (is_overflow()) {
+        return false;
+    }
+
+    if (query_context) {
+        query_context->reserve(key, offset, is_persistent, size, cache_lock);
+    }
+    return true;
+}
+
+void LRUFileCache::remove_if_exists(const Key& key, bool is_persistent) {
+    std::lock_guard cache_lock(_mutex);
+
+    auto file_key = std::make_pair(key, is_persistent);
+    auto it = _files.find(file_key);
+    if (it == _files.end()) {
+        return;
+    }
+
+    auto& offsets = it->second;
+
+    std::vector<FileBlockCell*> to_remove;
+    to_remove.reserve(offsets.size());
+
+    for (auto& [offset, cell] : offsets) {
+        to_remove.push_back(&cell);
+    }
+
+    bool some_cells_were_skipped = false;
+    for (auto& cell : to_remove) {
+        /// In ordinary case we remove data from cache when it's not used by anyone.
+        /// But if we have multiple replicated zero-copy tables on the same server
+        /// it became possible to start removing something from cache when it is used
+        /// by other "zero-copy" tables. That is why it's not an error.
+        if (!cell->releasable()) {
+            some_cells_were_skipped = true;
+            continue;
+        }
+
+        auto file_segment = cell->file_segment;
+        if (file_segment) {
+            std::lock_guard<std::mutex> segment_lock(file_segment->_mutex);
+            remove(file_segment->key(), is_persistent, file_segment->offset(), cache_lock,
+                   segment_lock);
+        }
+    }
+
+    auto key_path = get_path_in_local_cache(key);
+
+    if (!some_cells_were_skipped) {
+        _files.erase(file_key);
+
+        if (fs::exists(key_path)) {
+            std::error_code ec;
+            fs::remove_all(key_path, ec);
+            if (ec) {
+                LOG(WARNING) << ec.message();
+            }
+        }
+    }
+}
+
+void LRUFileCache::remove_if_releasable(bool is_persistent) {
+    /// Try remove all cached files by cache_base_path.
+    /// Only releasable file segments are evicted.
+    /// `remove_persistent_files` defines whether non-evictable by some criteria files
+    /// (they do not comply with the cache eviction policy) should also be removed.
+
+    std::lock_guard cache_lock(_mutex);
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    std::vector<FileBlock*> to_remove;
+    for (auto it = queue->begin(); it != queue->end();) {
+        const auto& [key, offset, size, _] = *it++;
+        auto* cell = get_cell(key, is_persistent, offset, cache_lock);
+
+        DCHECK(cell) << "Cache is in inconsistent state: LRU queue contains entries with no "
+                        "cache cell";
+
+        if (cell->releasable()) {
+            auto file_segment = cell->file_segment;
+            if (file_segment) {
+                std::lock_guard segment_lock(file_segment->_mutex);
+                remove(file_segment->key(), is_persistent, file_segment->offset(), cache_lock,
+                       segment_lock);
+            }
+        }
+    }
+}
+
+void LRUFileCache::remove(const Key& key, bool is_persistent, size_t offset,
+                          std::lock_guard<std::mutex>& cache_lock,
+                          std::lock_guard<std::mutex>& /* segment_lock */) {
+    LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    auto* cell = get_cell(key, is_persistent, offset, cache_lock);
+    DCHECK(cell) << "No cache cell for key: " << key.to_string() << ", offset: " << offset;
+
+    if (cell->queue_iterator) {
+        queue->remove(*cell->queue_iterator, cache_lock);
+    }
+    auto file_key = std::make_pair(key, is_persistent);
+    auto& offsets = _files[file_key];
+    offsets.erase(offset);
+
+    auto cache_file_path = get_path_in_local_cache(key, offset, is_persistent);
+    if (fs::exists(cache_file_path)) {
+        std::error_code ec;
+        fs::remove(cache_file_path, ec);
+        if (ec) {
+            LOG(WARNING) << ec.message();
+        }
+
+        if (_is_initialized && offsets.empty()) {
+            auto key_path = get_path_in_local_cache(key);
+
+            _files.erase(file_key);
+
+            auto another_key = std::make_pair(key, !is_persistent);
+            if (_files.count(another_key) < 1 && fs::exists(key_path)) {
+                std::error_code ec;
+                fs::remove_all(key_path, ec);
+                if (ec) {
+                    LOG(WARNING) << ec.message();
+                }
+            }
+        }
+    }
+}
+
+void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock) {
+    Key key;
+    uint64_t offset = 0;
+    size_t size = 0;
+    std::vector<std::pair<LRUQueue::Iterator, bool>> queue_entries;
+
+    /// cache_base_path / key / offset
+    fs::directory_iterator key_it {_cache_base_path};
+    for (; key_it != fs::directory_iterator(); ++key_it) {
+        key = Key(vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
+
+        fs::directory_iterator offset_it {key_it->path()};
+        for (; offset_it != fs::directory_iterator(); ++offset_it) {
+            auto offset_with_suffix = offset_it->path().filename().native();
+            auto delim_pos = offset_with_suffix.find('_');
+            bool is_persistent = false;
+            bool parsed = true;
+            try {
+                if (delim_pos == std::string::npos) {
+                    offset = stoull(offset_with_suffix);
+                } else {
+                    offset = stoull(offset_with_suffix.substr(0, delim_pos));
+                    is_persistent = offset_with_suffix.substr(delim_pos + 1) == "persistent";
+                }
+            } catch (...) {
+                parsed = false;
+            }
+
+            if (!parsed) {
+                LOG(WARNING) << "Unexpected file: " << offset_it->path().native();
+                continue; /// Or just remove? Some unexpected file.
+            }
+
+            size = offset_it->file_size();
+            if (size == 0) {
+                std::error_code ec;
+                fs::remove(offset_it->path(), ec);
+                if (ec) {
+                    LOG(WARNING) << ec.message();
+                }
+                continue;
+            }
+
+            if (try_reserve(key, TUniqueId(), is_persistent, offset, size, cache_lock)) {
+                auto* cell = add_cell(key, is_persistent, offset, size,
+                                      FileBlock::State::DOWNLOADED, cache_lock);
+                if (cell) {
+                    queue_entries.emplace_back(*cell->queue_iterator, is_persistent);
+                }
+            } else {
+                LOG(WARNING) << "Cache capacity changed (max size: " << _max_size << ", available: "
+                             << get_available_cache_size_unlocked(is_persistent, cache_lock)
+                             << "), cached file " << key_it->path().string()
+                             << " does not fit in cache anymore (size: " << size << ")";
+                std::error_code ec;
+                fs::remove(offset_it->path(), ec);
+                if (ec) {
+                    LOG(WARNING) << ec.message();
+                }
+            }
+        }
+    }
+
+    /// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
+    auto rng = std::default_random_engine(
+            static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+    std::shuffle(queue_entries.begin(), queue_entries.end(), rng);
+    for (const auto& [it, is_persistent] : queue_entries) {
+        LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+        queue->move_to_end(it, cache_lock);
+    }
+}
+
+std::vector<std::string> LRUFileCache::try_get_cache_paths(const Key& key, bool is_persistent) {
+    std::lock_guard cache_lock(_mutex);
+
+    std::vector<std::string> cache_paths;
+
+    const auto& cells_by_offset = _files[std::make_pair(key, is_persistent)];
+
+    for (const auto& [offset, cell] : cells_by_offset) {
+        if (cell.file_segment->state() == FileBlock::State::DOWNLOADED) {
+            cache_paths.push_back(get_path_in_local_cache(key, offset, is_persistent));
+        }
+    }
+
+    return cache_paths;
+}
+
+size_t LRUFileCache::get_used_cache_size(bool is_persistent) const {
+    std::lock_guard cache_lock(_mutex);
+    return get_used_cache_size_unlocked(is_persistent, cache_lock);
+}
+
+size_t LRUFileCache::get_used_cache_size_unlocked(bool is_persistent,
+                                                  std::lock_guard<std::mutex>& cache_lock) const {
+    return is_persistent ? _persistent_queue.get_total_cache_size(cache_lock)
+                         : _queue.get_total_cache_size(cache_lock);
+}
+
+size_t LRUFileCache::get_available_cache_size(bool is_persistent) const {
+    std::lock_guard cache_lock(_mutex);
+    return get_available_cache_size_unlocked(is_persistent, cache_lock);
+}
+
+size_t LRUFileCache::get_available_cache_size_unlocked(
+        bool is_persistent, std::lock_guard<std::mutex>& cache_lock) const {
+    size_t max_size = is_persistent ? _persistent_max_size : _max_size;
+    return max_size - get_used_cache_size_unlocked(is_persistent, cache_lock);
+}
+
+size_t LRUFileCache::get_file_segments_num(bool is_persistent) const {
+    std::lock_guard cache_lock(_mutex);
+    return get_file_segments_num_unlocked(is_persistent, cache_lock);
+}
+
+size_t LRUFileCache::get_file_segments_num_unlocked(bool is_persistent,
+                                                    std::lock_guard<std::mutex>& cache_lock) const {
+    const LRUQueue* queue = is_persistent ? &_persistent_queue : &_queue;
+    return queue->get_elements_num(cache_lock);
+}
+
+LRUFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_segment_, LRUFileCache* cache,
+                                           std::lock_guard<std::mutex>& cache_lock)
+        : file_segment(file_segment_) {
+    /**
+     * Cell can be created with either DOWNLOADED or EMPTY file segment's state.
+     * File segment acquires DOWNLOADING state and creates LRUQueue iterator on first
+     * successful getOrSetDownaloder call.
+     */
+
+    switch (file_segment->_download_state) {
+    case FileBlock::State::DOWNLOADED:
+    case FileBlock::State::EMPTY:
+    case FileBlock::State::SKIP_CACHE: {
+        break;
+    }
+    default:
+        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
+                      << FileBlock::state_to_string(file_segment->_download_state);
+    }
+}
+
+IFileCache::LRUQueue::Iterator IFileCache::LRUQueue::add(
+        const IFileCache::Key& key, size_t offset, bool is_persistent, size_t size,
+        std::lock_guard<std::mutex>& /* cache_lock */) {
+    cache_size += size;
+    return queue.insert(queue.end(), FileKeyAndOffset(key, offset, size, is_persistent));
+}
+
+void IFileCache::LRUQueue::remove(Iterator queue_it,
+                                  std::lock_guard<std::mutex>& /* cache_lock */) {
+    cache_size -= queue_it->size;
+    queue.erase(queue_it);
+}
+
+void IFileCache::LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
+    queue.clear();
+    cache_size = 0;
+}
+
+void IFileCache::LRUQueue::move_to_end(Iterator queue_it,
+                                       std::lock_guard<std::mutex>& /* cache_lock */) {
+    queue.splice(queue.end(), queue, queue_it);
+}
+bool IFileCache::LRUQueue::contains(const IFileCache::Key& key, size_t offset,
+                                    std::lock_guard<std::mutex>& /* cache_lock */) const {
+    /// This method is used for assertions in debug mode.
+    /// So we do not care about complexity here.
+    for (const auto& [entry_key, entry_offset, _, size] : queue) {
+        if (key == entry_key && offset == entry_offset) {
+            return true;
+        }
+    }
+    return false;
+}
+
+std::string IFileCache::LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
+    std::string result;
+    for (const auto& [key, offset, _, size] : queue) {
+        if (!result.empty()) {
+            result += ", ";
+        }
+        result += fmt::format("{}: [{}, {}]", key.to_string(), offset, offset + size - 1);
+    }
+    return result;
+}
+
+std::string LRUFileCache::dump_structure(const Key& key, bool is_persistent) {
+    std::lock_guard cache_lock(_mutex);
+    return dump_structure_unlocked(key, is_persistent, cache_lock);
+}
+
+std::string LRUFileCache::dump_structure_unlocked(const Key& key, bool is_persistent,
+                                                  std::lock_guard<std::mutex>& cache_lock) {
+    std::stringstream result;
+    const auto& cells_by_offset = _files[std::make_pair(key, is_persistent)];
+
+    for (const auto& [offset, cell] : cells_by_offset) {
+        result << cell.file_segment->get_info_for_log() << "\n";
+    }
+
+    result << "\n\nQueue: " << _queue.to_string(cache_lock);
+    return result.str();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h
new file mode 100644
index 0000000000..1d01677e23
--- /dev/null
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/LRUFileCachePriority.h
+// and modified by Doris
+
+#pragma once
+
+#include <chrono>
+#include <map>
+#include <memory>
+#include <optional>
+#include <unordered_map>
+
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_segment.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
+ * Implements LRU eviction policy.
+ */
+class LRUFileCache final : public IFileCache {
+public:
+    /**
+     * cache_base_path: the file cache path
+     * cache_settings: the file cache setttings
+     */
+    LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
+
+    /**
+     * get the files which range contain [offset, offset+size-1]
+     */
+    FileBlocksHolder get_or_set(const Key& key, size_t offset, size_t size, bool is_persistent,
+                                const TUniqueId& query_id) override;
+
+    // init file cache
+    Status initialize() override;
+
+    // remove the files belong to key
+    void remove_if_exists(const Key& key, bool is_persistent) override;
+
+    // remove the files only catched by cache
+    void remove_if_releasable(bool is_persistent) override;
+
+    std::vector<std::string> try_get_cache_paths(const Key& key, bool is_persistent) override;
+
+    size_t get_used_cache_size(bool is_persistent) const override;
+
+    size_t get_file_segments_num(bool is_persistent) const override;
+
+private:
+    struct FileBlockCell {
+        FileBlockSPtr file_segment;
+
+        /// Iterator is put here on first reservation attempt, if successful.
+        std::optional<LRUQueue::Iterator> queue_iterator;
+
+        /// Pointer to file segment is always hold by the cache itself.
+        /// Apart from pointer in cache, it can be hold by cache users, when they call
+        /// getorSet(), but cache users always hold it via FileBlocksHolder.
+        bool releasable() const { return file_segment.unique(); }
+
+        size_t size() const { return file_segment->_segment_range.size(); }
+
+        FileBlockCell(FileBlockSPtr file_segment_, LRUFileCache* cache,
+                      std::lock_guard<std::mutex>& cache_lock);
+
+        FileBlockCell(FileBlockCell&& other) noexcept
+                : file_segment(std::move(other.file_segment)),
+                  queue_iterator(other.queue_iterator) {}
+
+        FileBlockCell& operator=(const FileBlockCell&) = delete;
+        FileBlockCell(const FileBlockCell&) = delete;
+    };
+
+    using FileBlocksByOffset = std::map<size_t, FileBlockCell>;
+
+    struct HashCachedFileKey {
+        std::size_t operator()(const std::pair<Key, bool>& k) const { return KeyHash()(k.first); }
+    };
+    // key: <file key, is_persistent>
+    using CachedFiles =
+            std::unordered_map<std::pair<Key, bool>, FileBlocksByOffset, HashCachedFileKey>;
+
+    CachedFiles _files;
+    LRUQueue _queue;
+    LRUQueue _persistent_queue;
+
+    FileBlocks get_impl(const Key& key, const TUniqueId& query_id, bool is_persistent,
+                        const FileBlock::Range& range, std::lock_guard<std::mutex>& cache_lock);
+
+    FileBlockCell* get_cell(const Key& key, bool is_persistent, size_t offset,
+                            std::lock_guard<std::mutex>& cache_lock);
+
+    FileBlockCell* add_cell(const Key& key, bool is_persistent, size_t offset, size_t size,
+                            FileBlock::State state, std::lock_guard<std::mutex>& cache_lock);
+
+    void use_cell(const FileBlockCell& cell, const TUniqueId& query_id, bool is_persistent,
+                  FileBlocks& result, std::lock_guard<std::mutex>& cache_lock);
+
+    bool try_reserve(const Key& key, const TUniqueId& query_id, bool is_persistent, size_t offset,
+                     size_t size, std::lock_guard<std::mutex>& cache_lock) override;
+
+    bool try_reserve_for_main_list(const Key& key, QueryContextPtr query_context,
+                                   bool is_persistent, size_t offset, size_t size,
+                                   std::lock_guard<std::mutex>& cache_lock);
+
+    void remove(const Key& key, bool is_persistent, size_t offset,
+                std::lock_guard<std::mutex>& cache_lock,
+                std::lock_guard<std::mutex>& segment_lock) override;
+
+    size_t get_available_cache_size(bool is_persistent) const;
+
+    void load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock);
+
+    FileBlocks split_range_into_cells(const Key& key, const TUniqueId& query_id, bool is_persistent,
+                                      size_t offset, size_t size, FileBlock::State state,
+                                      std::lock_guard<std::mutex>& cache_lock);
+
+    std::string dump_structure_unlocked(const Key& key, bool is_persistent,
+                                        std::lock_guard<std::mutex>& cache_lock);
+
+    void fill_holes_with_empty_file_segments(FileBlocks& file_segments, const Key& key,
+                                             const TUniqueId& query_id, bool is_persistent,
+                                             const FileBlock::Range& range,
+                                             std::lock_guard<std::mutex>& cache_lock);
+
+    size_t get_used_cache_size_unlocked(bool is_persistent,
+                                        std::lock_guard<std::mutex>& cache_lock) const;
+
+    size_t get_available_cache_size_unlocked(bool is_persistent,
+                                             std::lock_guard<std::mutex>& cache_lock) const;
+
+    size_t get_file_segments_num_unlocked(bool is_persistent,
+                                          std::lock_guard<std::mutex>& cache_lock) const;
+
+public:
+    std::string dump_structure(const Key& key, bool is_persistent) override;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp
new file mode 100644
index 0000000000..cbdcfcdfa5
--- /dev/null
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/block/cached_remote_file_reader.h"
+
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "io/fs/file_reader.h"
+#include "olap/iterators.h"
+#include "olap/olap_common.h"
+
+namespace doris {
+namespace io {
+
+CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
+                                               const std::string& cache_path, IOContext* io_ctx)
+        : _remote_file_reader(std::move(remote_file_reader)), _io_ctx(io_ctx) {
+    _cache_key = IFileCache::hash(cache_path);
+    _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+    _disposable_cache = FileCacheFactory::instance().get_disposable_cache(_cache_key);
+}
+
+CachedRemoteFileReader::~CachedRemoteFileReader() {
+    close();
+}
+
+Status CachedRemoteFileReader::close() {
+    return _remote_file_reader->close();
+}
+
+std::pair<size_t, size_t> CachedRemoteFileReader::_align_size(size_t offset,
+                                                              size_t read_size) const {
+    size_t left = offset;
+    size_t right = offset + read_size - 1;
+    size_t align_left = (left / config::file_cache_max_file_segment_size) *
+                        config::file_cache_max_file_segment_size;
+    size_t align_right = (right / config::file_cache_max_file_segment_size + 1) *
+                         config::file_cache_max_file_segment_size;
+    align_right = align_right < size() ? align_right : size();
+    size_t align_size = align_right - align_left;
+    return std::make_pair(align_left, align_size);
+}
+
+Status CachedRemoteFileReader::read_at(size_t offset, Slice result, const IOContext& io_ctx,
+                                       size_t* bytes_read) {
+    if (bthread_self() == 0) {
+        return read_at_impl(offset, result, io_ctx, bytes_read);
+    }
+    return Status::NotSupported("Not Support bthread");
+}
+
+Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
+                                            const IOContext& /*io_ctx*/, size_t* bytes_read) {
+    DCHECK(!closed());
+    DCHECK(_io_ctx);
+    if (offset > size()) {
+        return Status::IOError(
+                fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset,
+                            size(), path().native()));
+    }
+    size_t bytes_req = result.size;
+    bytes_req = std::min(bytes_req, size() - offset);
+    if (UNLIKELY(bytes_req == 0)) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    CloudFileCachePtr cache = _io_ctx->use_disposable_cache ? _disposable_cache : _cache;
+    // cache == nullptr since use_disposable_cache = true and don't set  disposable cache in conf
+    if (cache == nullptr) {
+        return _remote_file_reader->read_at(offset, result, *_io_ctx, bytes_read);
+    }
+    ReadStatistics stats;
+    stats.bytes_read = bytes_req;
+    // if state == nullptr, the method is called for read footer
+    // if state->read_segment_index, read all the end of file
+    size_t align_left = offset, align_size = size() - offset;
+    if (!_io_ctx->read_segment_index) {
+        auto pair = _align_size(offset, bytes_req);
+        align_left = pair.first;
+        align_size = pair.second;
+        DCHECK((align_left % config::file_cache_max_file_segment_size) == 0);
+    }
+    bool is_persistent = _io_ctx->is_persistent;
+    TUniqueId query_id = _io_ctx->query_id ? *(_io_ctx->query_id) : TUniqueId();
+    FileBlocksHolder holder =
+            cache->get_or_set(_cache_key, align_left, align_size, is_persistent, query_id);
+    std::vector<FileBlockSPtr> empty_segments;
+    for (auto& segment : holder.file_segments) {
+        if (segment->state() == FileBlock::State::EMPTY) {
+            segment->get_or_set_downloader();
+            if (segment->is_downloader()) {
+                empty_segments.push_back(segment);
+            }
+        } else if (segment->state() == FileBlock::State::SKIP_CACHE) {
+            empty_segments.push_back(segment);
+            stats.bytes_skip_cache += segment->range().size();
+        }
+    }
+
+    size_t empty_start = 0;
+    size_t empty_end = 0;
+    if (!empty_segments.empty()) {
+        empty_start = empty_segments.front()->range().left;
+        empty_end = empty_segments.back()->range().right;
+        size_t size = empty_end - empty_start + 1;
+        std::unique_ptr<char[]> buffer(new char[size]);
+        RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
+                                                     *_io_ctx, &size));
+        for (auto& segment : empty_segments) {
+            if (segment->state() == FileBlock::State::SKIP_CACHE) {
+                continue;
+            }
+            char* cur_ptr = buffer.get() + segment->range().left - empty_start;
+            size_t segment_size = segment->range().size();
+            RETURN_IF_ERROR(segment->append(Slice(cur_ptr, segment_size)));
+            RETURN_IF_ERROR(segment->finalize_write());
+            stats.write_in_file_cache++;
+            stats.bytes_write_in_file_cache += segment_size;
+        }
+        // copy from memory directly
+        size_t right_offset = offset + result.size - 1;
+        if (empty_start <= right_offset && empty_end >= offset) {
+            size_t copy_left_offset = offset < empty_start ? empty_start : offset;
+            size_t copy_right_offset = right_offset < empty_end ? right_offset : empty_end;
+            char* dst = result.data + (copy_left_offset - offset);
+            char* src = buffer.get() + (copy_left_offset - empty_start);
+            size_t copy_size = copy_right_offset - copy_left_offset + 1;
+            memcpy(dst, src, copy_size);
+        }
+    } else {
+        stats.hit_cache = true;
+    }
+
+    size_t current_offset = offset;
+    size_t end_offset = offset + bytes_req - 1;
+    *bytes_read = 0;
+    for (auto& segment : holder.file_segments) {
+        if (current_offset > end_offset) {
+            break;
+        }
+        size_t left = segment->range().left;
+        size_t right = segment->range().right;
+        if (right < offset) {
+            continue;
+        }
+        size_t read_size =
+                end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1;
+        if (empty_start <= left && right <= empty_end) {
+            *bytes_read += read_size;
+            current_offset = right + 1;
+            continue;
+        }
+        FileBlock::State segment_state;
+        int64_t wait_time = 0;
+        static int64_t MAX_WAIT_TIME = 10;
+        do {
+            segment_state = segment->wait();
+            if (segment_state == FileBlock::State::DOWNLOADED) {
+                break;
+            }
+            if (segment_state != FileBlock::State::DOWNLOADING) {
+                return Status::IOError(
+                        "File Cache State is {}, the cache downloader encounters an error, please "
+                        "retry it",
+                        segment_state);
+            }
+        } while (++wait_time < MAX_WAIT_TIME);
+        if (UNLIKELY(wait_time) == MAX_WAIT_TIME) {
+            return Status::IOError("Waiting too long for the download to complete");
+        }
+        size_t file_offset = current_offset - left;
+        RETURN_IF_ERROR(segment->read_at(Slice(result.data + (current_offset - offset), read_size),
+                                         file_offset));
+        stats.bytes_read_from_file_cache += read_size;
+        *bytes_read += read_size;
+        current_offset = right + 1;
+    }
+    DCHECK(*bytes_read == bytes_req);
+    _update_state(stats, _io_ctx->file_cache_stats);
+    DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
+    return Status::OK();
+}
+
+void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
+                                           FileCacheStatistics* statis) const {
+    if (statis == nullptr) {
+        return;
+    }
+    statis->num_io_total++;
+    statis->num_io_bytes_read_total += read_stats.bytes_read;
+    statis->num_io_bytes_written_in_file_cache += read_stats.bytes_write_in_file_cache;
+    if (read_stats.hit_cache) {
+        statis->num_io_hit_cache++;
+    }
+    statis->num_io_bytes_read_from_file_cache += read_stats.bytes_read_from_file_cache;
+    statis->num_io_written_in_file_cache += read_stats.write_in_file_cache;
+    statis->num_io_bytes_skip_cache += read_stats.bytes_skip_cache;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h
new file mode 100644
index 0000000000..496f151404
--- /dev/null
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "gutil/macros.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_fwd.h"
+#include "io/cache/block/block_file_cache_profile.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_system.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileReader final : public FileReader {
+public:
+    CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path,
+                           IOContext* io_ctx);
+
+    ~CachedRemoteFileReader() override;
+
+    Status close() override;
+
+    Status read_at(size_t offset, Slice result, const IOContext& io_ctx,
+                   size_t* bytes_read) override;
+
+    Status read_at_impl(size_t offset, Slice result, const IOContext& io_ctx, size_t* bytes_read);
+
+    const Path& path() const override { return _remote_file_reader->path(); }
+
+    size_t size() const override { return _remote_file_reader->size(); }
+
+    bool closed() const override { return _remote_file_reader->closed(); }
+
+    FileSystemSPtr fs() const override { return _remote_file_reader->fs(); }
+
+private:
+    std::pair<size_t, size_t> _align_size(size_t offset, size_t size) const;
+
+    FileReaderSPtr _remote_file_reader;
+    IFileCache::Key _cache_key;
+    CloudFileCachePtr _cache;
+    CloudFileCachePtr _disposable_cache;
+
+    IOContext* _io_ctx;
+
+    struct ReadStatistics {
+        bool hit_cache = false;
+        int64_t bytes_read = 0;
+        int64_t bytes_read_from_file_cache = 0;
+        int64_t bytes_write_in_file_cache = 0;
+        int64_t write_in_file_cache = 0;
+        int64_t bytes_skip_cache = 0;
+    };
+    void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/dummy_file_cache.h b/be/src/io/cache/dummy_file_cache.h
index 863edc428e..aafce20fef 100644
--- a/be/src/io/cache/dummy_file_cache.h
+++ b/be/src/io/cache/dummy_file_cache.h
@@ -23,6 +23,7 @@
 
 #include "common/status.h"
 #include "io/cache/file_cache.h"
+#include "io/fs/file_system.h"
 #include "io/fs/path.h"
 
 namespace doris {
@@ -68,6 +69,8 @@ public:
 
     bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
 
+    FileSystemSPtr fs() const override { return nullptr; }
+
 private:
     void _add_file_cache(const Path& data_file);
     void _load();
diff --git a/be/src/io/cache/file_cache_manager.cpp b/be/src/io/cache/file_cache_manager.cpp
index 59a12b0ea0..d3db6d082b 100644
--- a/be/src/io/cache/file_cache_manager.cpp
+++ b/be/src/io/cache/file_cache_manager.cpp
@@ -216,11 +216,11 @@ void FileCacheManager::gc_file_caches() {
 
 FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir, int64_t alive_time_sec,
                                               io::FileReaderSPtr remote_file_reader,
-                                              io::FileCacheType cache_type) {
+                                              io::FileCachePolicy cache_type) {
     switch (cache_type) {
-    case io::FileCacheType::SUB_FILE_CACHE:
+    case io::FileCachePolicy::SUB_FILE_CACHE:
         return std::make_unique<WholeFileCache>(cache_dir, alive_time_sec, remote_file_reader);
-    case io::FileCacheType::WHOLE_FILE_CACHE:
+    case io::FileCachePolicy::WHOLE_FILE_CACHE:
         return std::make_unique<SubFileCache>(cache_dir, alive_time_sec, remote_file_reader);
     default:
         return nullptr;
diff --git a/be/src/io/cache/file_cache_manager.h b/be/src/io/cache/file_cache_manager.h
index b8200d964e..6bf49c0dc7 100644
--- a/be/src/io/cache/file_cache_manager.h
+++ b/be/src/io/cache/file_cache_manager.h
@@ -60,7 +60,7 @@ public:
 
     FileCachePtr new_file_cache(const std::string& cache_dir, int64_t alive_time_sec,
                                 io::FileReaderSPtr remote_file_reader,
-                                io::FileCacheType cache_type);
+                                io::FileCachePolicy cache_type);
 
     bool exist(const std::string& cache_path);
 
diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp
index 3d692d2e44..97da383862 100644
--- a/be/src/io/cache/sub_file_cache.cpp
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -192,7 +192,7 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
         }
     }
     io::FileReaderSPtr cache_reader;
-    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader));
+    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader, nullptr));
     _cache_file_readers.emplace(offset, cache_reader);
     _last_match_times.emplace(offset, time(nullptr));
     LOG(INFO) << "Create cache file from remote file successfully: "
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
index 828cde7191..dab350b6ce 100644
--- a/be/src/io/cache/sub_file_cache.h
+++ b/be/src/io/cache/sub_file_cache.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "io/cache/file_cache.h"
+#include "io/fs/file_system.h"
 #include "io/fs/path.h"
 
 namespace doris {
@@ -60,6 +61,8 @@ public:
 
     bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
 
+    FileSystemSPtr fs() const override { return _remote_file_reader->fs(); }
+
 private:
     Status _generate_cache_reader(size_t offset, size_t req_size);
 
diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp
index 372823b9da..d15e285491 100644
--- a/be/src/io/cache/whole_file_cache.cpp
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -125,7 +125,8 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
             return st;
         }
     }
-    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader));
+    RETURN_IF_ERROR(
+            io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader, nullptr));
     _cache_file_size = _cache_file_reader->size();
     LOG(INFO) << "Create cache file from remote file successfully: "
               << _remote_file_reader->path().native() << " -> " << cache_file.native();
diff --git a/be/src/io/cache/whole_file_cache.h b/be/src/io/cache/whole_file_cache.h
index d9ffbd9917..d4a5953364 100644
--- a/be/src/io/cache/whole_file_cache.h
+++ b/be/src/io/cache/whole_file_cache.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "io/cache/file_cache.h"
+#include "io/fs/file_system.h"
 #include "io/fs/path.h"
 
 namespace doris {
@@ -58,6 +59,8 @@ public:
 
     bool is_gc_finish() const override;
 
+    FileSystemSPtr fs() const override { return _remote_file_reader->fs(); }
+
 private:
     Status _generate_cache_reader(size_t offset, size_t req_size);
 
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index d44a2add8e..4fbc6dc9f6 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -17,11 +17,13 @@
 
 #include "io/file_factory.h"
 
+#include "common/config.h"
 #include "common/status.h"
 #include "io/broker_reader.h"
 #include "io/broker_writer.h"
 #include "io/buffered_reader.h"
 #include "io/fs/broker_file_system.h"
+#include "io/fs/file_reader_options.h"
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/local_file_system.h"
@@ -148,36 +150,40 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile, const TFileScanR
 Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
                                        const FileSystemProperties& system_properties,
                                        const FileDescription& file_description,
-                                       std::unique_ptr<io::FileSystem>* file_system,
-                                       io::FileReaderSPtr* file_reader) {
+                                       std::shared_ptr<io::FileSystem>* file_system,
+                                       io::FileReaderSPtr* file_reader, IOContext* io_ctx) {
     TFileType::type type = system_properties.system_type;
-    io::FileSystem* file_system_ptr = nullptr;
+    std::string cache_policy = "no_cache";
+    if (config::enable_file_cache) {
+        cache_policy = "file_block_cache";
+    }
+    io::FileReaderOptions reader_options(io::cache_type_from_string(cache_policy),
+                                         io::FileBlockCachePathPolicy());
     switch (type) {
     case TFileType::FILE_LOCAL: {
-        RETURN_IF_ERROR(
-                io::global_local_filesystem()->open_file(file_description.path, file_reader));
+        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(
+                file_description.path, reader_options, file_reader, io_ctx));
         break;
     }
     case TFileType::FILE_S3: {
         RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path,
-                                         &file_system_ptr, file_reader));
+                                         file_system, file_reader, reader_options, io_ctx));
         break;
     }
     case TFileType::FILE_HDFS: {
         RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path,
-                                           &file_system_ptr, file_reader));
+                                           file_system, file_reader, reader_options, io_ctx));
         break;
     }
     case TFileType::FILE_BROKER: {
         RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
                                              system_properties.properties, file_description,
-                                             &file_system_ptr, file_reader));
+                                             file_system, file_reader, reader_options, io_ctx));
         break;
     }
     default:
         return Status::NotSupported("unsupported file reader type: {}", std::to_string(type));
     }
-    file_system->reset(file_system_ptr);
     return Status::OK();
 }
 
@@ -200,11 +206,13 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
 }
 
 Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
-                                       io::FileSystem** hdfs_file_system,
-                                       io::FileReaderSPtr* reader) {
-    *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, "");
-    RETURN_IF_ERROR((dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect());
-    RETURN_IF_ERROR((*hdfs_file_system)->open_file(path, reader));
+                                       std::shared_ptr<io::FileSystem>* hdfs_file_system,
+                                       io::FileReaderSPtr* reader,
+                                       const io::FileReaderOptions& reader_options,
+                                       IOContext* io_ctx) {
+    hdfs_file_system->reset(new io::HdfsFileSystem(hdfs_params, ""));
+    RETURN_IF_ERROR((std::static_pointer_cast<io::HdfsFileSystem>(*hdfs_file_system))->connect());
+    RETURN_IF_ERROR((*hdfs_file_system)->open_file(path, reader_options, reader, io_ctx));
     return Status::OK();
 }
 
@@ -216,28 +224,36 @@ Status FileFactory::create_hdfs_writer(const std::map<std::string, std::string>&
 }
 
 Status FileFactory::create_s3_reader(const std::map<std::string, std::string>& prop,
-                                     const std::string& path, io::FileSystem** s3_file_system,
-                                     io::FileReaderSPtr* reader) {
+                                     const std::string& path,
+                                     std::shared_ptr<io::FileSystem>* s3_file_system,
+                                     io::FileReaderSPtr* reader,
+                                     const io::FileReaderOptions& reader_options,
+                                     IOContext* io_ctx) {
     S3URI s3_uri(path);
     if (!s3_uri.parse()) {
         return Status::InvalidArgument("s3 uri is invalid: {}", path);
     }
     S3Conf s3_conf;
     RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf));
-    *s3_file_system = new io::S3FileSystem(s3_conf, "");
-    RETURN_IF_ERROR((dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect());
-    RETURN_IF_ERROR((*s3_file_system)->open_file(s3_uri.get_key(), reader));
+    s3_file_system->reset(new io::S3FileSystem(s3_conf, ""));
+    RETURN_IF_ERROR((std::static_pointer_cast<io::S3FileSystem>(*s3_file_system))->connect());
+    RETURN_IF_ERROR((*s3_file_system)->open_file(s3_uri.get_key(), reader_options, reader, io_ctx));
     return Status::OK();
 }
 
 Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
                                          const std::map<std::string, std::string>& prop,
                                          const FileDescription& file_description,
-                                         io::FileSystem** broker_file_system,
-                                         io::FileReaderSPtr* reader) {
-    *broker_file_system = new io::BrokerFileSystem(broker_addr, prop, file_description.file_size);
-    RETURN_IF_ERROR((dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect());
-    RETURN_IF_ERROR((*broker_file_system)->open_file(file_description.path, reader));
+                                         std::shared_ptr<io::FileSystem>* broker_file_system,
+                                         io::FileReaderSPtr* reader,
+                                         const io::FileReaderOptions& reader_options,
+                                         IOContext* io_ctx) {
+    broker_file_system->reset(
+            new io::BrokerFileSystem(broker_addr, prop, file_description.file_size));
+    RETURN_IF_ERROR(
+            (std::static_pointer_cast<io::BrokerFileSystem>(*broker_file_system))->connect());
+    RETURN_IF_ERROR((*broker_file_system)
+                            ->open_file(file_description.path, reader_options, reader, io_ctx));
     return Status::OK();
 }
 } // namespace doris
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index d59aca61bf..63ab0f2a83 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -25,7 +25,8 @@
 namespace doris {
 namespace io {
 class FileSystem;
-}
+class FileReaderOptions;
+} // namespace io
 class ExecEnv;
 class TNetworkAddress;
 class RuntimeProfile;
@@ -73,8 +74,8 @@ public:
     static Status create_file_reader(RuntimeProfile* profile,
                                      const FileSystemProperties& system_properties,
                                      const FileDescription& file_description,
-                                     std::unique_ptr<io::FileSystem>* file_system,
-                                     io::FileReaderSPtr* file_reader);
+                                     std::shared_ptr<io::FileSystem>* file_system,
+                                     io::FileReaderSPtr* file_reader, IOContext* io_ctx);
 
     // Create FileReader for stream load pipe
     static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);
@@ -84,20 +85,27 @@ public:
                                      std::shared_ptr<FileReader>& file_reader);
 
     static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
-                                     io::FileSystem** hdfs_file_system, io::FileReaderSPtr* reader);
+                                     std::shared_ptr<io::FileSystem>* hdfs_file_system,
+                                     io::FileReaderSPtr* reader,
+                                     const io::FileReaderOptions& reader_options,
+                                     IOContext* io_ctx);
 
     static Status create_hdfs_writer(const std::map<std::string, std::string>& properties,
                                      const std::string& path, std::unique_ptr<FileWriter>& writer);
 
     static Status create_s3_reader(const std::map<std::string, std::string>& prop,
-                                   const std::string& path, io::FileSystem** s3_file_system,
-                                   io::FileReaderSPtr* reader);
+                                   const std::string& path,
+                                   std::shared_ptr<io::FileSystem>* s3_file_system,
+                                   io::FileReaderSPtr* reader,
+                                   const io::FileReaderOptions& reader_options, IOContext* io_ctx);
 
     static Status create_broker_reader(const TNetworkAddress& broker_addr,
                                        const std::map<std::string, std::string>& prop,
                                        const FileDescription& file_description,
-                                       io::FileSystem** hdfs_file_system,
-                                       io::FileReaderSPtr* reader);
+                                       std::shared_ptr<io::FileSystem>* hdfs_file_system,
+                                       io::FileReaderSPtr* reader,
+                                       const io::FileReaderOptions& reader_options,
+                                       IOContext* io_ctx);
 
     static TFileType::type convert_storage_type(TStorageBackendType::type type) {
         switch (type) {
diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp
index 4ddd0ece2c..fe43d99749 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -27,9 +27,13 @@ namespace doris {
 namespace io {
 
 BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
-                                   size_t file_size, TBrokerFD fd, BrokerFileSystem* fs)
-        : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd) {
-    fs->get_client(&_client);
+                                   size_t file_size, TBrokerFD fd,
+                                   std::shared_ptr<BrokerFileSystem> fs)
+        : _path(path),
+          _file_size(file_size),
+          _broker_addr(broker_addr),
+          _fd(fd),
+          _fs(std::move(fs)) {
     DorisMetrics::instance()->broker_file_open_reading->increment(1);
     DorisMetrics::instance()->broker_file_reader_total->increment(1);
 }
diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h
index 8f60d8266c..5e6261631e 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -22,6 +22,7 @@
 
 #include <atomic>
 
+#include "io/fs/broker_file_system.h"
 #include "io/fs/file_reader.h"
 #include "runtime/client_cache.h"
 namespace doris {
@@ -32,7 +33,7 @@ class BrokerFileSystem;
 class BrokerFileReader : public FileReader {
 public:
     BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size,
-                     TBrokerFD fd, BrokerFileSystem* fs);
+                     TBrokerFD fd, std::shared_ptr<BrokerFileSystem> fs);
 
     ~BrokerFileReader() override;
 
@@ -47,6 +48,8 @@ public:
 
     bool closed() const override { return _closed.load(std::memory_order_acquire); }
 
+    FileSystemSPtr fs() const override { return _fs; }
+
 private:
     const Path& _path;
     size_t _file_size;
@@ -54,6 +57,7 @@ private:
     const TNetworkAddress& _broker_addr;
     TBrokerFD _fd;
 
+    std::shared_ptr<BrokerFileSystem> _fs;
     std::atomic<bool> _closed = false;
     std::shared_ptr<BrokerServiceConnection> _client;
 };
diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp
index af6cdcefdc..4407f3d686 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -77,7 +77,8 @@ Status BrokerFileSystem::connect() {
     return status;
 }
 
-Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader,
+                                   IOContext* /*io_ctx*/) {
     CHECK_BROKER_CLIENT(_client);
     TBrokerOpenReaderRequest request;
     request.__set_version(TBrokerVersion::VERSION_ONE);
@@ -117,7 +118,9 @@ Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
         _file_size = response->size;
     }
     fd = response->fd;
-    *reader = std::make_shared<BrokerFileReader>(_broker_addr, path, _file_size, fd, this);
+    *reader = std::make_shared<BrokerFileReader>(
+            _broker_addr, path, _file_size, fd,
+            std::static_pointer_cast<BrokerFileSystem>(shared_from_this()));
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h
index fc478cddb5..ec091ec577 100644
--- a/be/src/io/fs/broker_file_system.h
+++ b/be/src/io/fs/broker_file_system.h
@@ -33,7 +33,7 @@ public:
         return Status::NotSupported("Currently not support to create file through broker.");
     }
 
-    Status open_file(const Path& path, FileReaderSPtr* reader) override;
+    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) override;
 
     Status delete_file(const Path& path) override;
 
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index 5d1fd1ef5c..542147bb90 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <memory>
-
 #include "common/status.h"
 #include "gutil/macros.h"
 #include "io/fs/path.h"
@@ -30,6 +28,8 @@ struct IOContext;
 
 namespace io {
 
+class FileSystem;
+
 class FileReader {
 public:
     FileReader() = default;
@@ -47,6 +47,8 @@ public:
     virtual size_t size() const = 0;
 
     virtual bool closed() const = 0;
+
+    virtual std::shared_ptr<FileSystem> fs() const = 0;
 };
 
 using FileReaderSPtr = std::shared_ptr<FileReader>;
diff --git a/be/src/io/fs/file_reader_options.cpp b/be/src/io/fs/file_reader_options.cpp
index 00534d8c4e..0f8f74dbc2 100644
--- a/be/src/io/fs/file_reader_options.cpp
+++ b/be/src/io/fs/file_reader_options.cpp
@@ -20,15 +20,15 @@
 namespace doris {
 namespace io {
 
-FileCacheType cache_type_from_string(const std::string& type) {
+FileCachePolicy cache_type_from_string(const std::string& type) {
     if (type == "sub_file_cache") {
-        return FileCacheType::SUB_FILE_CACHE;
+        return FileCachePolicy::SUB_FILE_CACHE;
     } else if (type == "whole_file_cache") {
-        return FileCacheType::WHOLE_FILE_CACHE;
+        return FileCachePolicy::WHOLE_FILE_CACHE;
     } else if (type == "file_block_cache") {
-        return FileCacheType::FILE_BLOCK_CACHE;
+        return FileCachePolicy::FILE_BLOCK_CACHE;
     } else {
-        return FileCacheType::NO_CACHE;
+        return FileCachePolicy::NO_CACHE;
     }
 }
 
diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h
index c4c0061704..f7cc0d13ab 100644
--- a/be/src/io/fs/file_reader_options.h
+++ b/be/src/io/fs/file_reader_options.h
@@ -17,21 +17,19 @@
 
 #pragma once
 
-#include <string>
-
-#include "common/status.h"
+#include "io/cache/block/block_file_cache.h"
 
 namespace doris {
 namespace io {
 
-enum class FileCacheType : uint8_t {
+enum class FileCachePolicy : uint8_t {
     NO_CACHE,
     SUB_FILE_CACHE,
     WHOLE_FILE_CACHE,
     FILE_BLOCK_CACHE,
 };
 
-FileCacheType cache_type_from_string(const std::string& type);
+FileCachePolicy cache_type_from_string(const std::string& type);
 
 // CachePathPolicy it to define which cache path should be used
 // for the local cache of the given file(path).
@@ -59,13 +57,19 @@ public:
     }
 };
 
+class FileBlockCachePathPolicy : public CachePathPolicy {
+public:
+    FileBlockCachePathPolicy() = default;
+    std::string get_cache_path(const std::string& path) const override { return path; }
+};
+
 class FileReaderOptions {
 public:
-    FileReaderOptions(FileCacheType cache_type_, const CachePathPolicy& path_policy_)
+    FileReaderOptions(FileCachePolicy cache_type_, const CachePathPolicy& path_policy_)
             : cache_type(cache_type_), path_policy(path_policy_) {}
 
-    FileCacheType cache_type;
-    CachePathPolicy path_policy;
+    FileCachePolicy cache_type;
+    const CachePathPolicy& path_policy;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index 735e257930..4598ff32d2 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -42,7 +42,7 @@ enum class FileSystemType : uint8_t {
     BROKER,
 };
 
-class FileSystem {
+class FileSystem : public std::enable_shared_from_this<FileSystem> {
 public:
     FileSystem(Path&& root_path, ResourceId&& resource_id, FileSystemType type)
             : _root_path(std::move(root_path)), _resource_id(std::move(resource_id)), _type(type) {}
@@ -54,9 +54,9 @@ public:
     virtual Status create_file(const Path& path, FileWriterPtr* writer) = 0;
 
     virtual Status open_file(const Path& path, const FileReaderOptions& reader_options,
-                             FileReaderSPtr* reader) = 0;
+                             FileReaderSPtr* reader, IOContext* io_ctx) = 0;
 
-    virtual Status open_file(const Path& path, FileReaderSPtr* reader) = 0;
+    virtual Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) = 0;
 
     virtual Status delete_file(const Path& path) = 0;
 
diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp
index 39c9795e95..738a6a3736 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -23,12 +23,12 @@
 namespace doris {
 namespace io {
 HdfsFileReader::HdfsFileReader(Path path, size_t file_size, const std::string& name_node,
-                               hdfsFile hdfs_file, HdfsFileSystem* fs)
+                               hdfsFile hdfs_file, std::shared_ptr<HdfsFileSystem> fs)
         : _path(std::move(path)),
           _file_size(file_size),
           _name_node(name_node),
           _hdfs_file(hdfs_file),
-          _fs(fs) {
+          _fs(std::move(fs)) {
     DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
     DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
 }
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 94823fadd1..bd25b1d6ba 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -25,7 +25,7 @@ namespace io {
 class HdfsFileReader : public FileReader {
 public:
     HdfsFileReader(Path path, size_t file_size, const std::string& name_node, hdfsFile hdfs_file,
-                   HdfsFileSystem* fs);
+                   std::shared_ptr<HdfsFileSystem> fs);
 
     ~HdfsFileReader() override;
 
@@ -40,12 +40,14 @@ public:
 
     bool closed() const override { return _closed.load(std::memory_order_acquire); }
 
+    FileSystemSPtr fs() const override { return _fs; }
+
 private:
     Path _path;
     size_t _file_size;
     const std::string& _name_node;
     hdfsFile _hdfs_file;
-    HdfsFileSystem* _fs;
+    std::shared_ptr<HdfsFileSystem> _fs;
     std::atomic<bool> _closed = false;
 };
 } // namespace io
diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp
index b053aeebb4..65330b8015 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -18,6 +18,7 @@
 #include "io/fs/hdfs_file_system.h"
 
 #include "gutil/hash/hash.h"
+#include "io/cache/block/cached_remote_file_reader.h"
 #include "io/fs/hdfs_file_reader.h"
 #include "io/hdfs_builder.h"
 #include "service/backend_options.h"
@@ -93,7 +94,7 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer
     return Status::NotSupported("Currently not support to create file to HDFS");
 }
 
-Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader, IOContext* /*io_ctx*/) {
     CHECK_HDFS_HANDLE(_fs_handle);
     size_t file_len = 0;
     RETURN_IF_ERROR(file_size(path, &file_len));
@@ -121,7 +122,9 @@ Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
                                          hdfsGetLastError());
         }
     }
-    *reader = std::make_shared<HdfsFileReader>(path, file_len, _namenode, hdfs_file, this);
+    *reader = std::make_shared<HdfsFileReader>(
+            path, file_len, _namenode, hdfs_file,
+            std::static_pointer_cast<HdfsFileSystem>(shared_from_this()));
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index 01e8da58ca..49f0cb6a25 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -86,7 +86,7 @@ public:
 
     Status create_file(const Path& path, FileWriterPtr* writer) override;
 
-    Status open_file(const Path& path, FileReaderSPtr* reader) override;
+    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) override;
 
     Status delete_file(const Path& path) override;
 
diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp
index 1794115128..87e45699fb 100644
--- a/be/src/io/fs/local_file_reader.cpp
+++ b/be/src/io/fs/local_file_reader.cpp
@@ -20,13 +20,13 @@
 #include <atomic>
 
 #include "util/doris_metrics.h"
-#include "util/errno.h"
 
 namespace doris {
 namespace io {
 
-LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd)
-        : _fd(fd), _path(std::move(path)), _file_size(file_size) {
+LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd,
+                                 std::shared_ptr<LocalFileSystem> fs)
+        : _fd(fd), _path(std::move(path)), _file_size(file_size), _fs(std::move(fs)) {
     DorisMetrics::instance()->local_file_open_reading->increment(1);
     DorisMetrics::instance()->local_file_reader_total->increment(1);
 }
@@ -48,7 +48,7 @@ Status LocalFileReader::close() {
     return Status::OK();
 }
 
-Status LocalFileReader::read_at(size_t offset, Slice result, const IOContext& io_ctx,
+Status LocalFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/,
                                 size_t* bytes_read) {
     DCHECK(!closed());
     if (offset > _file_size) {
diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h
index 1ed4fac3a5..dcb988db26 100644
--- a/be/src/io/fs/local_file_reader.h
+++ b/be/src/io/fs/local_file_reader.h
@@ -20,6 +20,7 @@
 #include <atomic>
 
 #include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
 
 namespace doris {
@@ -27,7 +28,7 @@ namespace io {
 
 class LocalFileReader final : public FileReader {
 public:
-    LocalFileReader(Path path, size_t file_size, int fd);
+    LocalFileReader(Path path, size_t file_size, int fd, std::shared_ptr<LocalFileSystem> fs);
 
     ~LocalFileReader() override;
 
@@ -42,11 +43,14 @@ public:
 
     bool closed() const override { return _closed.load(std::memory_order_acquire); }
 
+    FileSystemSPtr fs() const override { return _fs; }
+
 private:
     int _fd = -1; // owned
     Path _path;
     size_t _file_size;
     std::atomic<bool> _closed = false;
+    std::shared_ptr<LocalFileSystem> _fs;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp
index 051c56a1ad..ea5efe17c4 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -46,7 +46,7 @@ Status LocalFileSystem::create_file(const Path& path, FileWriterPtr* writer) {
     return Status::OK();
 }
 
-Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader, IOContext* /*io_ctx*/) {
     auto fs_path = absolute_path(path);
     size_t fsize = 0;
     RETURN_IF_ERROR(file_size(fs_path, &fsize));
@@ -55,7 +55,9 @@ Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
     if (fd < 0) {
         return Status::IOError("cannot open {}: {}", fs_path.native(), std::strerror(errno));
     }
-    *reader = std::make_shared<LocalFileReader>(std::move(fs_path), fsize, fd);
+    *reader = std::make_shared<LocalFileReader>(
+            std::move(fs_path), fsize, fd,
+            std::static_pointer_cast<LocalFileSystem>(shared_from_this()));
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 6d7803c375..d4b8e2e044 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -31,11 +31,11 @@ public:
     Status create_file(const Path& path, FileWriterPtr* writer) override;
 
     Status open_file(const Path& path, const FileReaderOptions& reader_options,
-                     FileReaderSPtr* reader) override {
-        return open_file(path, reader);
+                     FileReaderSPtr* reader, IOContext* io_ctx) override {
+        return open_file(path, reader, io_ctx);
     }
 
-    Status open_file(const Path& path, FileReaderSPtr* reader) override;
+    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) override;
 
     Status delete_file(const Path& path) override;
 
diff --git a/be/src/io/fs/path.h b/be/src/io/fs/path.h
index 9832ea6322..695d51063a 100644
--- a/be/src/io/fs/path.h
+++ b/be/src/io/fs/path.h
@@ -28,5 +28,11 @@ inline Path operator/(Path&& lhs, const Path& rhs) {
     return std::move(lhs /= rhs);
 }
 
+struct PathHasher {
+    std::size_t operator()(const doris::io::Path& k) const {
+        return std::hash<std::string>()(k.filename().native());
+    }
+};
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp
index 871c325782..a2bfb93d66 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -18,6 +18,7 @@
 #include "io/fs/remote_file_system.h"
 
 #include "gutil/strings/stringpiece.h"
+#include "io/cache/block/cached_remote_file_reader.h"
 #include "io/cache/file_cache_manager.h"
 #include "io/fs/file_reader_options.h"
 
@@ -25,16 +26,16 @@ namespace doris {
 namespace io {
 
 Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& reader_options,
-                                   FileReaderSPtr* reader) {
+                                   FileReaderSPtr* reader, IOContext* io_ctx) {
     FileReaderSPtr raw_reader;
-    RETURN_IF_ERROR(open_file(path, &raw_reader));
+    RETURN_IF_ERROR(open_file(path, &raw_reader, io_ctx));
     switch (reader_options.cache_type) {
-    case io::FileCacheType::NO_CACHE: {
+    case io::FileCachePolicy::NO_CACHE: {
         *reader = raw_reader;
         break;
     }
-    case io::FileCacheType::SUB_FILE_CACHE:
-    case io::FileCacheType::WHOLE_FILE_CACHE: {
+    case io::FileCachePolicy::SUB_FILE_CACHE:
+    case io::FileCachePolicy::WHOLE_FILE_CACHE: {
         StringPiece str(path.native());
         std::string cache_path = reader_options.path_policy.get_cache_path(str.as_string());
         io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache(
@@ -44,11 +45,15 @@ Status RemoteFileSystem::open_file(const Path& path, const FileReaderOptions& re
         *reader = cache_reader;
         break;
     }
-    case io::FileCacheType::FILE_BLOCK_CACHE: {
-        return Status::NotSupported("add file block cache reader");
+    case io::FileCachePolicy::FILE_BLOCK_CACHE: {
+        DCHECK(io_ctx);
+        StringPiece str(raw_reader->path().native());
+        std::string cache_path = reader_options.path_policy.get_cache_path(str.as_string());
+        *reader =
+                std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path, io_ctx);
+        break;
     }
     default: {
-        // TODO: add file block cache reader
         return Status::InternalError("Unknown cache type: {}", reader_options.cache_type);
     }
     }
diff --git a/be/src/io/fs/remote_file_system.h b/be/src/io/fs/remote_file_system.h
index 2218ce40c3..195d424d93 100644
--- a/be/src/io/fs/remote_file_system.h
+++ b/be/src/io/fs/remote_file_system.h
@@ -37,9 +37,9 @@ public:
     virtual Status connect() = 0;
 
     Status open_file(const Path& path, const FileReaderOptions& reader_options,
-                     FileReaderSPtr* reader) override;
+                     FileReaderSPtr* reader, IOContext* io_ctx) override;
 
-    Status open_file(const Path& path, FileReaderSPtr* reader) override {
+    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) override {
         return Status::NotSupported("implemented in derived classes");
     }
 };
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 38b18d71a5..3922bfd58f 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -27,10 +27,10 @@ namespace doris {
 namespace io {
 
 S3FileReader::S3FileReader(Path path, size_t file_size, std::string key, std::string bucket,
-                           S3FileSystem* fs)
+                           std::shared_ptr<S3FileSystem> fs)
         : _path(std::move(path)),
           _file_size(file_size),
-          _fs(fs),
+          _fs(std::move(fs)),
           _bucket(std::move(bucket)),
           _key(std::move(key)) {
     DorisMetrics::instance()->s3_file_open_reading->increment(1);
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index 6d6e4666b2..6a719287ed 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -28,7 +28,7 @@ namespace io {
 class S3FileReader final : public FileReader {
 public:
     S3FileReader(Path path, size_t file_size, std::string key, std::string bucket,
-                 S3FileSystem* fs);
+                 std::shared_ptr<S3FileSystem> fs);
 
     ~S3FileReader() override;
 
@@ -43,10 +43,12 @@ public:
 
     bool closed() const override { return _closed.load(std::memory_order_acquire); }
 
+    FileSystemSPtr fs() const override { return _fs; }
+
 private:
     Path _path;
     size_t _file_size;
-    S3FileSystem* _fs;
+    std::shared_ptr<S3FileSystem> _fs;
 
     std::string _bucket;
     std::string _key;
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 68feb418e3..5796e68f4e 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -34,6 +34,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "gutil/strings/stringpiece.h"
+#include "io/cache/block/cached_remote_file_reader.h"
 #include "io/fs/remote_file_system.h"
 #include "io/fs/s3_file_reader.h"
 #include "io/fs/s3_file_writer.h"
@@ -142,13 +143,14 @@ Status S3FileSystem::create_file(const Path& path, FileWriterPtr* writer) {
     return Status::OK();
 }
 
-Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader, IOContext* /*io_ctx*/) {
     size_t fsize = 0;
     RETURN_IF_ERROR(file_size(path, &fsize));
     auto key = get_key(path);
     auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;
-    *reader = std::make_shared<S3FileReader>(std::move(fs_path), fsize, std::move(key),
-                                             _s3_conf.bucket, this);
+    *reader = std::make_shared<S3FileReader>(
+            std::move(fs_path), fsize, std::move(key), _s3_conf.bucket,
+            std::static_pointer_cast<S3FileSystem>(shared_from_this()));
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 46510d3aa0..015b75908c 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -40,7 +40,7 @@ public:
 
     Status create_file(const Path& path, FileWriterPtr* writer) override;
 
-    Status open_file(const Path& path, FileReaderSPtr* reader) override;
+    Status open_file(const Path& path, FileReaderSPtr* reader, IOContext* io_ctx) override;
 
     Status delete_file(const Path& path) override;
 
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 59d391d5ad..9aa42a027a 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -21,6 +21,7 @@
 #include <deque>
 
 #include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
 #include "runtime/message_body_sink.h"
 
 namespace doris {
@@ -65,6 +66,8 @@ public:
 
     Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
 
+    FileSystemSPtr fs() const override { return nullptr; }
+
 private:
     // read the next buffer from _buf_queue
     Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length);
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index d62b5feee9..d564b2b238 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -35,7 +35,21 @@ class Schema;
 class ColumnPredicate;
 
 struct IOContext {
+    IOContext() = default;
+
+    IOContext(const TUniqueId* query_id_, FileCacheStatistics* stats_, bool is_presistent_,
+              bool use_disposable_cache_, bool read_segment_index_)
+            : query_id(query_id_),
+              is_persistent(is_presistent_),
+              use_disposable_cache(use_disposable_cache_),
+              read_segment_index(read_segment_index_),
+              file_cache_stats(stats_) {}
     ReaderType reader_type;
+    const TUniqueId* query_id = nullptr;
+    bool is_persistent = false;
+    bool use_disposable_cache = false;
+    bool read_segment_index = false;
+    FileCacheStatistics* file_cache_stats = nullptr;
 };
 namespace vectorized {
 struct IteratorRowRef;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 0cece99d6c..efb0f53b05 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -264,6 +264,17 @@ using KeyRange = std::pair<WrapperField*, WrapperField*>;
 
 static const int GENERAL_DEBUG_COUNT = 0;
 
+struct FileCacheStatistics {
+    int64_t num_io_total = 0;
+    int64_t num_io_hit_cache = 0;
+    int64_t num_io_bytes_read_total = 0;
+    int64_t num_io_bytes_read_from_file_cache = 0;
+    int64_t num_io_bytes_read_from_write_cache = 0;
+    int64_t num_io_written_in_file_cache = 0;
+    int64_t num_io_bytes_written_in_file_cache = 0;
+    int64_t num_io_bytes_skip_cache = 0;
+};
+
 // ReaderStatistics used to collect statistics when scan data from storage
 struct OlapReaderStatistics {
     int64_t io_ns = 0;
@@ -352,6 +363,9 @@ struct OlapReaderStatistics {
     // usage example:
     //               SCOPED_RAW_TIMER(&_stats->general_debug_ns[1]);
     int64_t general_debug_ns[GENERAL_DEBUG_COUNT] = {};
+
+    FileCacheStatistics file_cache_stats;
+    int64_t load_segments_timer = 0;
 };
 
 using ColumnId = uint32_t;
diff --git a/be/src/olap/options.cpp b/be/src/olap/options.cpp
index 49665555e9..bf8ddb10da 100644
--- a/be/src/olap/options.cpp
+++ b/be/src/olap/options.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/options.h"
 
+#include <rapidjson/document.h>
+
 #include <algorithm>
 
 #include "common/config.h"
@@ -40,6 +42,11 @@ static std::string SSD_UC = "SSD";
 static std::string HDD_UC = "HDD";
 static std::string REMOTE_CACHE_UC = "REMOTE_CACHE";
 
+static std::string CACHE_PATH = "path";
+static std::string CACHE_NORMAL_SIZE = "normal";
+static std::string CACHE_PERSISTENT_SIZE = "persistent";
+static std::string CACHE_QUERY_LIMIT_SIZE = "query_limit";
+
 // TODO: should be a general util method
 static std::string to_upper(const std::string& str) {
     std::string out = str;
@@ -155,4 +162,62 @@ Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>*
     return Status::OK();
 }
 
+/** format:   
+ *  [
+ *    {"path": "storage1", "normal":53687091200,"persistent":21474836480,"query_limit": "10737418240"},
+ *    {"path": "storage2", "normal":53687091200,"persistent":21474836480},
+ *    {"path": "storage3", "normal":53687091200,"persistent":21474836480},
+ *  ]
+ */
+Status parse_conf_cache_paths(const std::string& config_path, std::vector<CachePath>& paths) {
+    using namespace rapidjson;
+    Document document;
+    document.Parse(config_path.c_str());
+    DCHECK(document.IsArray()) << config_path << " " << document.GetType();
+    for (auto& config : document.GetArray()) {
+        auto map = config.GetObject();
+        DCHECK(map.HasMember(CACHE_PATH.c_str()));
+        std::string path = map.FindMember(CACHE_PATH.c_str())->value.GetString();
+        int64_t normal_size = map.HasMember(CACHE_NORMAL_SIZE.c_str())
+                                      ? map.FindMember(CACHE_NORMAL_SIZE.c_str())->value.GetInt64()
+                                      : 0;
+        int64_t persistent_size =
+                map.HasMember(CACHE_PERSISTENT_SIZE.c_str())
+                        ? map.FindMember(CACHE_PERSISTENT_SIZE.c_str())->value.GetInt64()
+                        : 0;
+        int64_t query_limit_bytes = 0;
+        if (config::enable_file_cache_query_limit) {
+            query_limit_bytes =
+                    map.HasMember(CACHE_QUERY_LIMIT_SIZE.c_str())
+                            ? map.FindMember(CACHE_QUERY_LIMIT_SIZE.c_str())->value.GetInt64()
+                            : normal_size / 5;
+        }
+        if (normal_size <= 0 || persistent_size <= 0) {
+            LOG(WARNING) << "normal or persistent size should not less than or equal to zero";
+            return Status::InternalError("OLAP_ERR_INPUT_PARAMETER_ERROR");
+        }
+        paths.emplace_back(std::move(path), normal_size, persistent_size, query_limit_bytes);
+    }
+    if (paths.empty()) {
+        LOG(WARNING) << "fail to parse storage_root_path config. value=[" << config_path << "]";
+        return Status::InternalError("OLAP_ERR_INPUT_PARAMETER_ERROR");
+    }
+    return Status::OK();
+}
+
+io::FileCacheSettings CachePath::init_settings() const {
+    io::FileCacheSettings settings;
+    settings.max_size = normal_bytes;
+    settings.persistent_max_size = persistent_bytes;
+    settings.max_file_segment_size = config::file_cache_max_file_segment_size;
+
+    settings.max_elements = std::max<size_t>(
+            normal_bytes / config::file_cache_max_file_segment_size, settings.max_elements);
+    settings.persistent_max_elements =
+            std::max<size_t>(persistent_bytes / config::file_cache_max_file_segment_size,
+                             settings.persistent_max_elements);
+    settings.max_query_cache_size = query_limit_bytes;
+    return settings;
+}
+
 } // end namespace doris
diff --git a/be/src/olap/options.h b/be/src/olap/options.h
index 41ab8ff3f0..c6ed214ceb 100644
--- a/be/src/olap/options.h
+++ b/be/src/olap/options.h
@@ -20,6 +20,7 @@
 #include <string>
 #include <vector>
 
+#include "io/cache/block/block_file_cache_settings.h"
 #include "olap/olap_define.h"
 #include "util/uid_util.h"
 
@@ -42,6 +43,22 @@ Status parse_root_path(const std::string& root_path, StorePath* path);
 
 Status parse_conf_store_paths(const std::string& config_path, std::vector<StorePath>* path);
 
+struct CachePath {
+    io::FileCacheSettings init_settings() const;
+    CachePath(std::string path, int64_t normal_bytes, int64_t persistent_bytes,
+              int64_t query_limit_bytes)
+            : path(std::move(path)),
+              normal_bytes(normal_bytes),
+              persistent_bytes(persistent_bytes),
+              query_limit_bytes(query_limit_bytes) {}
+    std::string path;
+    int64_t normal_bytes = 0;
+    int64_t persistent_bytes = 0;
+    int64_t query_limit_bytes = 0;
+};
+
+Status parse_conf_cache_paths(const std::string& config_path, std::vector<CachePath>& path);
+
 struct EngineOptions {
     // list paths that tablet will be put into.
     std::vector<StorePath> store_paths;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index 53db362ed0..15ed949025 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -51,14 +51,14 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se
                                          io::SegmentCachePathPolicy());
     io::FileReaderSPtr file_reader;
 #ifndef BE_TEST
-    RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader));
+    RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader, nullptr));
 #else
     // be ut use local file reader instead of remote file reader while use remote cache
     if (!config::file_cache_type.empty()) {
-        RETURN_IF_ERROR(
-                io::global_local_filesystem()->open_file(path, reader_options, &file_reader));
+        RETURN_IF_ERROR(io::global_local_filesystem()->open_file(path, reader_options, &file_reader,
+                                                                 nullptr));
     } else {
-        RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader));
+        RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader, nullptr));
     }
 #endif
 
diff --git a/be/src/io/fs/path.h b/be/src/service/brpc_conflict.h
similarity index 57%
copy from be/src/io/fs/path.h
copy to be/src/service/brpc_conflict.h
index 9832ea6322..35ef1b815c 100644
--- a/be/src/io/fs/path.h
+++ b/be/src/service/brpc_conflict.h
@@ -17,16 +17,32 @@
 
 #pragma once
 
-#include <filesystem>
+// This file is used to fixed macro conflict between butil and gutil
+// and this file must put the first include in source file
 
-namespace doris {
-namespace io {
+#include "gutil/macros.h"
+// Macros in the guti/macros.h, use butil's define
+#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS
+#undef DISALLOW_IMPLICIT_CONSTRUCTORS
+#endif
 
-using Path = std::filesystem::path;
+#ifdef arraysize
+#undef arraysize
+#endif
 
-inline Path operator/(Path&& lhs, const Path& rhs) {
-    return std::move(lhs /= rhs);
-}
+#ifdef ARRAY_SIZE
+#undef ARRAY_SIZE
+#endif
 
-} // namespace io
-} // namespace doris
+#undef OVERRIDE
+#undef FINAL
+
+// use be/src/gutil/integral_types.h override butil/basictypes.h
+#include "gutil/integral_types.h"
+#ifdef BASE_INTEGRAL_TYPES_H_
+#define BUTIL_BASICTYPES_H_
+#endif
+
+#ifdef DEBUG_MODE
+#undef DEBUG_MODE
+#endif
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a02d290e0d..f0355f76c3 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -47,6 +47,7 @@
 #include "common/status.h"
 #include "common/utils.h"
 #include "env/env.h"
+#include "io/cache/block/block_file_cache_factory.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
@@ -388,6 +389,44 @@ int main(int argc, char** argv) {
     daemon.init(argc, argv, paths);
     daemon.start();
 
+    if (doris::config::enable_file_cache) {
+        std::vector<doris::CachePath> cache_paths;
+        olap_res = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
+        if (!olap_res) {
+            LOG(FATAL) << "parse config file cache path failed, path="
+                       << doris::config::file_cache_path;
+            exit(-1);
+        }
+        for (auto& cache_path : cache_paths) {
+            Status st = doris::io::FileCacheFactory::instance().create_file_cache(
+                    cache_path.path, cache_path.init_settings(), doris::io::FileCacheType::NORMAL);
+            if (!st) {
+                LOG(FATAL) << st;
+                exit(-1);
+            }
+        }
+
+        if (!doris::config::disposable_file_cache_path.empty()) {
+            cache_paths.clear();
+            olap_res = doris::parse_conf_cache_paths(doris::config::disposable_file_cache_path,
+                                                     cache_paths);
+            if (!olap_res) {
+                LOG(FATAL) << "parse config disposable file cache path failed, path="
+                           << doris::config::disposable_file_cache_path;
+                exit(-1);
+            }
+            for (auto& cache_path : cache_paths) {
+                Status st = doris::io::FileCacheFactory::instance().create_file_cache(
+                        cache_path.path, cache_path.init_settings(),
+                        doris::io::FileCacheType::DISPOSABLE);
+                if (!st) {
+                    LOG(FATAL) << st;
+                    exit(-1);
+                }
+            }
+        }
+    }
+
     doris::ResourceTls::init();
     if (!doris::BackendOptions::init()) {
         exit(-1);
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 119eb709b5..86c6a0347d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -340,6 +340,9 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
 
     std::unique_ptr<vectorized::GenericReader> reader(nullptr);
     std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema"));
+    IOContext io_ctx;
+    FileCacheStatistics file_cache_statis;
+    io_ctx.file_cache_stats = &file_cache_statis;
     switch (params.format_type) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
     case TFileFormatType::FORMAT_CSV_GZ:
@@ -349,21 +352,22 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     case TFileFormatType::FORMAT_CSV_DEFLATE: {
         // file_slots is no use
         std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots));
+        reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx));
         break;
     }
     case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range));
+        reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
         break;
     }
     case TFileFormatType::FORMAT_ORC: {
         std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, ""));
+        reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx));
         break;
     }
     case TFileFormatType::FORMAT_JSON: {
         std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots));
+        reader.reset(
+                new vectorized::NewJsonReader(profile.get(), params, range, file_slots, &io_ctx));
         break;
     }
     default:
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 81e585441d..012e9bdfad 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -64,6 +64,7 @@ set(VEC_FILES
   common/sort/topn_sorter.cpp
   common/sort/vsort_exec_exprs.cpp
   common/string_utils/string_utils.cpp
+  common/hex.cpp
   core/block.cpp
   core/block_spill_reader.cpp
   core/block_spill_writer.cpp
diff --git a/be/src/vec/common/hex.cpp b/be/src/vec/common/hex.cpp
new file mode 100644
index 0000000000..612a8ec291
--- /dev/null
+++ b/be/src/vec/common/hex.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/hex.cpp
+// and modified by Doris
+
+#include "vec/common/hex.h"
+
+namespace doris::vectorized {
+
+const char* const hex_digit_to_char_uppercase_table = "0123456789ABCDEF";
+const char* const hex_digit_to_char_lowercase_table = "0123456789abcdef";
+
+const char* const hex_byte_to_char_uppercase_table =
+        "000102030405060708090A0B0C0D0E0F"
+        "101112131415161718191A1B1C1D1E1F"
+        "202122232425262728292A2B2C2D2E2F"
+        "303132333435363738393A3B3C3D3E3F"
+        "404142434445464748494A4B4C4D4E4F"
+        "505152535455565758595A5B5C5D5E5F"
+        "606162636465666768696A6B6C6D6E6F"
+        "707172737475767778797A7B7C7D7E7F"
+        "808182838485868788898A8B8C8D8E8F"
+        "909192939495969798999A9B9C9D9E9F"
+        "A0A1A2A3A4A5A6A7A8A9AAABACADAEAF"
+        "B0B1B2B3B4B5B6B7B8B9BABBBCBDBEBF"
+        "C0C1C2C3C4C5C6C7C8C9CACBCCCDCECF"
+        "D0D1D2D3D4D5D6D7D8D9DADBDCDDDEDF"
+        "E0E1E2E3E4E5E6E7E8E9EAEBECEDEEEF"
+        "F0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF";
+
+const char* const hex_byte_to_char_lowercase_table =
+        "000102030405060708090a0b0c0d0e0f"
+        "101112131415161718191a1b1c1d1e1f"
+        "202122232425262728292a2b2c2d2e2f"
+        "303132333435363738393a3b3c3d3e3f"
+        "404142434445464748494a4b4c4d4e4f"
+        "505152535455565758595a5b5c5d5e5f"
+        "606162636465666768696a6b6c6d6e6f"
+        "707172737475767778797a7b7c7d7e7f"
+        "808182838485868788898a8b8c8d8e8f"
+        "909192939495969798999a9b9c9d9e9f"
+        "a0a1a2a3a4a5a6a7a8a9aaabacadaeaf"
+        "b0b1b2b3b4b5b6b7b8b9babbbcbdbebf"
+        "c0c1c2c3c4c5c6c7c8c9cacbcccdcecf"
+        "d0d1d2d3d4d5d6d7d8d9dadbdcdddedf"
+        "e0e1e2e3e4e5e6e7e8e9eaebecedeeef"
+        "f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff";
+
+const char* const hex_char_to_digit_table =
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\xff\xff\xff\xff\xff\xff" //0-9
+        "\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //A-Z
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\x0a\x0b\x0c\x0d\x0e\x0f\xff\xff\xff\xff\xff\xff\xff\xff\xff" //a-z
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
+        "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
+
+const char* const bin_byte_to_char_table =
+        "0000000000000001000000100000001100000100000001010000011000000111"
+        "0000100000001001000010100000101100001100000011010000111000001111"
+        "0001000000010001000100100001001100010100000101010001011000010111"
+        "0001100000011001000110100001101100011100000111010001111000011111"
+        "0010000000100001001000100010001100100100001001010010011000100111"
+        "0010100000101001001010100010101100101100001011010010111000101111"
+        "0011000000110001001100100011001100110100001101010011011000110111"
+        "0011100000111001001110100011101100111100001111010011111000111111"
+        "0100000001000001010000100100001101000100010001010100011001000111"
+        "0100100001001001010010100100101101001100010011010100111001001111"
+        "0101000001010001010100100101001101010100010101010101011001010111"
+        "0101100001011001010110100101101101011100010111010101111001011111"
+        "0110000001100001011000100110001101100100011001010110011001100111"
+        "0110100001101001011010100110101101101100011011010110111001101111"
+        "0111000001110001011100100111001101110100011101010111011001110111"
+        "0111100001111001011110100111101101111100011111010111111001111111"
+        "1000000010000001100000101000001110000100100001011000011010000111"
+        "1000100010001001100010101000101110001100100011011000111010001111"
+        "1001000010010001100100101001001110010100100101011001011010010111"
+        "1001100010011001100110101001101110011100100111011001111010011111"
+        "1010000010100001101000101010001110100100101001011010011010100111"
+        "1010100010101001101010101010101110101100101011011010111010101111"
+        "1011000010110001101100101011001110110100101101011011011010110111"
+        "1011100010111001101110101011101110111100101111011011111010111111"
+        "1100000011000001110000101100001111000100110001011100011011000111"
+        "1100100011001001110010101100101111001100110011011100111011001111"
+        "1101000011010001110100101101001111010100110101011101011011010111"
+        "1101100011011001110110101101101111011100110111011101111011011111"
+        "1110000011100001111000101110001111100100111001011110011011100111"
+        "1110100011101001111010101110101111101100111011011110111011101111"
+        "1111000011110001111100101111001111110100111101011111011011110111"
+        "1111100011111001111110101111101111111100111111011111111011111111";
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hex.h b/be/src/vec/common/hex.h
new file mode 100644
index 0000000000..6d2d72bb42
--- /dev/null
+++ b/be/src/vec/common/hex.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/hex.h
+// and modified by Doris
+
+#pragma once
+#include <cstddef>
+#include <cstring>
+#include <string>
+
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+/// Maps 0..15 to 0..9A..F or 0..9a..f correspondingly.
+
+extern const char* const hex_digit_to_char_uppercase_table;
+extern const char* const hex_digit_to_char_lowercase_table;
+
+inline char hex_digit_uppercase(unsigned char c) {
+    return hex_digit_to_char_uppercase_table[c];
+}
+
+inline char hex_digit_lowercase(unsigned char c) {
+    return hex_digit_to_char_lowercase_table[c];
+}
+
+/// Maps 0..255 to 00..FF or 00..ff correspondingly
+
+extern const char* const hex_byte_to_char_uppercase_table;
+extern const char* const hex_byte_to_char_lowercase_table;
+
+inline void write_hex_byte_uppercase(UInt8 byte, void* out) {
+    memcpy(out, &hex_byte_to_char_uppercase_table[static_cast<size_t>(byte) * 2], 2);
+}
+
+inline void write_hex_byte_lowercase(UInt8 byte, void* out) {
+    memcpy(out, &hex_byte_to_char_lowercase_table[static_cast<size_t>(byte) * 2], 2);
+}
+
+extern const char* const bin_byte_to_char_table;
+
+inline void write_bin_byte(UInt8 byte, void* out) {
+    memcpy(out, &bin_byte_to_char_table[static_cast<size_t>(byte) * 8], 8);
+}
+
+/// Produces hex representation of an unsigned int with leading zeros (for checksums)
+template <typename TUInt>
+inline void write_hex_uint_impl(TUInt uint_, char* out, const char* const table) {
+    union {
+        TUInt value;
+        UInt8 uint8[sizeof(TUInt)];
+    };
+
+    value = uint_;
+
+    /// Use little endian
+    for (size_t i = 0; i < sizeof(TUInt); ++i) {
+        memcpy(out + i * 2, &table[static_cast<size_t>(uint8[sizeof(TUInt) - 1 - i]) * 2], 2);
+    }
+}
+
+template <typename TUInt>
+inline void write_hex_uint_uppercase(TUInt uint_, char* out) {
+    write_hex_uint_impl(uint_, out, hex_byte_to_char_uppercase_table);
+}
+
+template <typename TUInt>
+inline void write_hex_uint_lowercase(TUInt uint_, char* out) {
+    write_hex_uint_impl(uint_, out, hex_byte_to_char_lowercase_table);
+}
+
+template <typename TUInt>
+std::string get_hex_uint_uppercase(TUInt uint_) {
+    std::string res(sizeof(TUInt) * 2, '\0');
+    write_hex_uint_uppercase(uint_, res.data());
+    return res;
+}
+
+template <typename TUInt>
+std::string get_hex_uint_lowercase(TUInt uint_) {
+    std::string res(sizeof(TUInt) * 2, '\0');
+    write_hex_uint_lowercase(uint_, res.data());
+    return res;
+}
+
+/// Maps 0..9, A..F, a..f to 0..15. Other chars are mapped to implementation specific value.
+
+extern const char* const hex_char_to_digit_table;
+
+inline UInt8 unhex(char c) {
+    return hex_char_to_digit_table[static_cast<UInt8>(c)];
+}
+
+inline UInt8 unhex2(const char* data) {
+    return static_cast<UInt8>(unhex(data[0])) * 0x10 + static_cast<UInt8>(unhex(data[1]));
+}
+
+inline UInt16 unhex4(const char* data) {
+    return static_cast<UInt16>(unhex(data[0])) * 0x1000 +
+           static_cast<UInt16>(unhex(data[1])) * 0x100 +
+           static_cast<UInt16>(unhex(data[2])) * 0x10 + static_cast<UInt16>(unhex(data[3]));
+}
+
+template <typename TUInt>
+TUInt unhex_uint(const char* data) {
+    TUInt res = TUInt(0);
+    if constexpr ((sizeof(TUInt) <= 8) || ((sizeof(TUInt) % 8) != 0)) {
+        for (size_t i = 0; i < sizeof(TUInt) * 2; ++i, ++data) {
+            res <<= 4;
+            res += unhex(*data);
+        }
+    } else {
+        for (size_t i = 0; i < sizeof(TUInt) / 8; ++i, data += 16) {
+            res <<= TUInt(64);
+            res += TUInt(unhex_uint<UInt64>(data));
+        }
+    }
+    return res;
+}
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/uint128.h b/be/src/vec/common/uint128.h
index 039572b17d..bb7c1a64af 100644
--- a/be/src/vec/common/uint128.h
+++ b/be/src/vec/common/uint128.h
@@ -63,6 +63,38 @@ struct UInt128 {
     bool operator>(const UInt128 rhs) const { return tuple() > rhs.tuple(); }
     bool operator>=(const UInt128 rhs) const { return tuple() >= rhs.tuple(); }
 
+    UInt128 operator<<(const UInt128& rhs) const {
+        const uint64_t shift = rhs.low;
+        if (((bool)rhs.high) || (shift >= 128)) {
+            return UInt128(0);
+        } else if (shift == 64) {
+            return UInt128(0, low);
+        } else if (shift == 0) {
+            return *this;
+        } else if (shift < 64) {
+            return UInt128(low << shift, (high << shift) + (low >> (64 - shift)));
+        } else if ((128 > shift) && (shift > 64)) {
+            return UInt128(0, low << (shift - 64));
+        } else {
+            return UInt128(0);
+        }
+    }
+
+    UInt128& operator<<=(const UInt128& rhs) {
+        *this = *this << rhs;
+        return *this;
+    }
+
+    UInt128 operator+(const UInt128& rhs) const {
+        return UInt128(low + rhs.low, high + rhs.high + ((low + rhs.low) < low));
+    }
+
+    UInt128& operator+=(const UInt128& rhs) {
+        high += rhs.high + ((low + rhs.low) < low);
+        low += rhs.low;
+        return *this;
+    }
+
     template <typename T>
     bool operator==(const T rhs) const {
         return *this == UInt128(rhs);
diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp
index 463bba1bf4..58aaa3cbf5 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -24,15 +24,16 @@
 namespace doris {
 namespace vectorized {
 Status BlockSpillReader::open() {
-    std::unique_ptr<io::FileSystem> file_system;
+    std::shared_ptr<io::FileSystem> file_system;
     FileSystemProperties system_properties;
     system_properties.system_type = TFileType::FILE_LOCAL;
 
     FileDescription file_description;
     file_description.path = file_path_;
 
+    IOContext io_ctx;
     RETURN_IF_ERROR(FileFactory::create_file_reader(nullptr, system_properties, file_description,
-                                                    &file_system, &file_reader_));
+                                                    &file_system, &file_reader_, &io_ctx));
 
     size_t file_size = file_reader_->size();
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 38fd30ba74..ef123c6eaa 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -26,6 +26,8 @@
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
 #include "io/file_factory.h"
+#include "olap/iterators.h"
+#include "olap/olap_common.h"
 #include "util/string_util.h"
 #include "util/utf8_check.h"
 #include "vec/core/block.h"
@@ -39,7 +41,7 @@ const static Slice _s_null_slice = Slice("\\N");
 
 CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
                      const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                     const std::vector<SlotDescriptor*>& file_slot_descs)
+                     const std::vector<SlotDescriptor*>& file_slot_descs, IOContext* io_ctx)
         : _state(state),
           _profile(profile),
           _counter(counter),
@@ -52,7 +54,8 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
           _line_reader_eof(false),
           _text_converter(nullptr),
           _decompressor(nullptr),
-          _skip_lines(0) {
+          _skip_lines(0),
+          _io_ctx(io_ctx) {
     _file_format_type = _params.format_type;
     _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
     _file_compress_type = _params.compress_type;
@@ -64,7 +67,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
 
 CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
                      const TFileRangeDesc& range,
-                     const std::vector<SlotDescriptor*>& file_slot_descs)
+                     const std::vector<SlotDescriptor*>& file_slot_descs, IOContext* io_ctx)
         : _state(nullptr),
           _profile(profile),
           _params(params),
@@ -73,7 +76,8 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params
           _line_reader(nullptr),
           _line_reader_eof(false),
           _text_converter(nullptr),
-          _decompressor(nullptr) {
+          _decompressor(nullptr),
+          _io_ctx(io_ctx) {
     _file_format_type = _params.format_type;
     _file_compress_type = _params.compress_type;
     _size = _range.size;
@@ -122,8 +126,9 @@ Status CsvReader::init_reader(bool is_load) {
     if (_params.file_type == TFileType::FILE_STREAM) {
         RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
     } else {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, system_properties, file_description, &_file_system, &_file_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties,
+                                                        file_description, &_file_system,
+                                                        &_file_reader, _io_ctx));
     }
     if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
@@ -564,7 +569,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
     file_description.file_size = _range.__isset.file_size ? _range.file_size : 0;
 
     RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties, file_description,
-                                                    &_file_system, &_file_reader));
+                                                    &_file_system, &_file_reader, _io_ctx));
     if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
         return Status::EndOfFile("Empty File");
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index c237958d07..a2fba641da 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "io/fs/file_reader.h"
+#include "olap/iterators.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -34,10 +35,11 @@ class CsvReader : public GenericReader {
 public:
     CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
               const TFileScanRangeParams& params, const TFileRangeDesc& range,
-              const std::vector<SlotDescriptor*>& file_slot_descs);
+              const std::vector<SlotDescriptor*>& file_slot_descs, IOContext* io_ctx);
 
     CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-              const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs);
+              const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
+              IOContext* io_ctx);
     ~CsvReader() override;
 
     Status init_reader(bool is_query);
@@ -91,7 +93,7 @@ private:
     // True if this is a load task
     bool _is_load = false;
 
-    std::unique_ptr<io::FileSystem> _file_system;
+    std::shared_ptr<io::FileSystem> _file_system;
     io::FileReaderSPtr _file_reader;
     std::unique_ptr<LineReader> _line_reader;
     bool _line_reader_eof;
@@ -113,6 +115,8 @@ private:
     int _line_delimiter_length;
     bool _trim_double_quotes = false;
 
+    IOContext* _io_ctx;
+
     // save source text which have been splitted.
     std::vector<Slice> _split_values;
 };
diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
index 016d29ca1d..ca80428c38 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
@@ -38,8 +38,6 @@ void NewPlainBinaryLineReader::close() {}
 Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
     std::unique_ptr<uint8_t[]> file_buf;
     size_t read_size = 0;
-    IOContext io_ctx;
-    io_ctx.reader_type = READER_QUERY;
     switch (_file_type) {
     case TFileType::FILE_LOCAL:
     case TFileType::FILE_HDFS:
@@ -47,6 +45,7 @@ Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bo
         size_t file_size = _file_reader->size();
         file_buf.reset(new uint8_t[file_size]);
         Slice result(file_buf.get(), file_size);
+        IOContext io_ctx;
         RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size));
         break;
     }
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index 932a792736..d0d3acf284 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -32,7 +32,8 @@ using namespace ErrorCode;
 
 NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
                              const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                             const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof)
+                             const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
+                             IOContext* io_ctx)
         : _vhandle_json_callback(nullptr),
           _state(state),
           _profile(profile),
@@ -51,7 +52,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
           _scanner_eof(scanner_eof),
-          _current_offset(0) {
+          _current_offset(0),
+          _io_ctx(io_ctx) {
     _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
     _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
@@ -59,7 +61,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann
 
 NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
                              const TFileRangeDesc& range,
-                             const std::vector<SlotDescriptor*>& file_slot_descs)
+                             const std::vector<SlotDescriptor*>& file_slot_descs, IOContext* io_ctx)
         : _vhandle_json_callback(nullptr),
           _state(nullptr),
           _profile(profile),
@@ -73,7 +75,8 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams
           _total_rows(0),
           _value_allocator(_value_buffer, sizeof(_value_buffer)),
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
-          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator) {}
+          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
+          _io_ctx(io_ctx) {}
 
 Status NewJsonReader::init_reader() {
     RETURN_IF_ERROR(_get_range_params());
@@ -304,8 +307,9 @@ Status NewJsonReader::_open_file_reader() {
     if (_params.file_type == TFileType::FILE_STREAM) {
         RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
     } else {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, system_properties, file_description, &_file_system, &_file_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties,
+                                                        file_description, &_file_system,
+                                                        &_file_reader, _io_ctx));
     }
     return Status::OK();
 }
@@ -891,8 +895,6 @@ std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) {
 }
 
 Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size) {
-    IOContext io_ctx;
-    io_ctx.reader_type = READER_QUERY;
     switch (_params.file_type) {
     case TFileType::FILE_LOCAL:
     case TFileType::FILE_HDFS:
@@ -900,7 +902,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
         size_t file_size = _file_reader->size();
         file_buf->reset(new uint8_t[file_size]);
         Slice result(file_buf->get(), file_size);
-        RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, io_ctx, read_size));
+        RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, *_io_ctx, read_size));
         break;
     }
     case TFileType::FILE_STREAM: {
diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h
index 98aae55ea4..b67f9393e1 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -23,6 +23,7 @@
 #include <rapidjson/writer.h>
 
 #include "io/fs/file_reader.h"
+#include "olap/iterators.h"
 #include "vec/exec/format/generic_reader.h"
 namespace doris {
 
@@ -39,10 +40,12 @@ class NewJsonReader : public GenericReader {
 public:
     NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
                   const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof);
+                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
+                  IOContext* io_ctx);
 
     NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs);
+                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
+                  IOContext* io_ctx);
     ~NewJsonReader() override = default;
 
     Status init_reader();
@@ -105,7 +108,7 @@ private:
     const TFileRangeDesc& _range;
     const std::vector<SlotDescriptor*>& _file_slot_descs;
 
-    std::unique_ptr<io::FileSystem> _file_system;
+    std::shared_ptr<io::FileSystem> _file_system;
     io::FileReaderSPtr _file_reader;
     std::unique_ptr<LineReader> _line_reader;
     bool _reader_eof;
@@ -144,6 +147,8 @@ private:
 
     size_t _current_offset;
 
+    IOContext* _io_ctx;
+
     RuntimeProfile::Counter* _bytes_read_counter;
     RuntimeProfile::Counter* _read_timer;
     RuntimeProfile::Counter* _file_read_timer;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 0d4fac98f8..0c27d78add 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -72,7 +72,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
 
 OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
                      const TFileRangeDesc& range, const std::vector<std::string>& column_names,
-                     size_t batch_size, const std::string& ctz)
+                     size_t batch_size, const std::string& ctz, IOContext* io_ctx)
         : _profile(profile),
           _scan_params(params),
           _scan_range(range),
@@ -80,19 +80,22 @@ OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz),
-          _column_names(column_names) {
+          _column_names(column_names),
+          _io_ctx(io_ctx) {
     TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
     _init_profile();
 }
 
 OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                     const std::vector<std::string>& column_names, const std::string& ctz)
+                     const std::vector<std::string>& column_names, const std::string& ctz,
+                     IOContext* io_ctx)
         : _profile(nullptr),
           _scan_params(params),
           _scan_range(range),
           _ctz(ctz),
           _column_names(column_names),
-          _file_system(nullptr) {}
+          _file_system(nullptr),
+          _io_ctx(io_ctx) {}
 
 OrcReader::~OrcReader() {
     close();
@@ -153,8 +156,9 @@ Status OrcReader::init_reader(
         file_description.start_offset = _scan_range.start_offset;
         file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0;
 
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, system_properties, file_description, &_file_system, &inner_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties,
+                                                        file_description, &_file_system,
+                                                        &inner_reader, _io_ctx));
 
         _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
     }
@@ -210,8 +214,9 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
         file_description.start_offset = _scan_range.start_offset;
         file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0;
 
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, system_properties, file_description, &_file_system, &inner_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties,
+                                                        file_description, &_file_system,
+                                                        &inner_reader, _io_ctx));
 
         _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
     }
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 4f5f4dba10..74ef9977a4 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -70,10 +70,11 @@ public:
 
     OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
               const TFileRangeDesc& range, const std::vector<std::string>& column_names,
-              size_t batch_size, const std::string& ctz);
+              size_t batch_size, const std::string& ctz, IOContext* io_ctx);
 
     OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
-              const std::vector<std::string>& column_names, const std::string& ctz);
+              const std::vector<std::string>& column_names, const std::string& ctz,
+              IOContext* io_ctx);
 
     ~OrcReader() override;
     // for test
@@ -278,7 +279,9 @@ private:
     orc::ReaderOptions _reader_options;
     orc::RowReaderOptions _row_reader_options;
 
-    std::unique_ptr<io::FileSystem> _file_system;
+    std::shared_ptr<io::FileSystem> _file_system;
+
+    IOContext* _io_ctx;
 
     // only for decimal
     DecimalScaleParams _decimal_scale_params;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 7e8635dd5d..0c6bfc085a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -32,19 +32,22 @@
 namespace doris::vectorized {
 
 ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-                             const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz)
+                             const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz,
+                             IOContext* io_ctx)
         : _profile(profile),
           _scan_params(params),
           _scan_range(range),
           _batch_size(batch_size),
           _range_start_offset(range.start_offset),
           _range_size(range.size),
-          _ctz(ctz) {
+          _ctz(ctz),
+          _io_ctx(io_ctx) {
     _init_profile();
 }
 
-ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range)
-        : _profile(nullptr), _scan_params(params), _scan_range(range) {}
+ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+                             IOContext* io_ctx)
+        : _profile(nullptr), _scan_params(params), _scan_range(range), _io_ctx(io_ctx) {}
 
 ParquetReader::~ParquetReader() {
     close();
@@ -151,8 +154,9 @@ Status ParquetReader::_open_file() {
     file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0;
 
     if (_file_reader == nullptr) {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, system_properties, file_description, &_file_system, &_file_reader));
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties,
+                                                        file_description, &_file_system,
+                                                        &_file_reader, _io_ctx));
     }
     if (_file_metadata == nullptr) {
         if (_file_reader->size() == 0) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9871b512bd..3caa12d14e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -56,9 +56,11 @@ public:
     };
 
     ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz);
+                  const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz,
+                  IOContext* io_ctx);
 
-    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range);
+    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+                  IOContext* io_ctx);
 
     ~ParquetReader() override;
     // for test
@@ -154,7 +156,7 @@ private:
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
-    std::unique_ptr<io::FileSystem> _file_system = nullptr;
+    std::shared_ptr<io::FileSystem> _file_system = nullptr;
     io::FileReaderSPtr _file_reader = nullptr;
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
@@ -187,5 +189,7 @@ private:
     ParquetColumnReader::Statistics _column_statistics;
     ParquetProfile _parquet_profile;
     bool _closed = false;
+
+    IOContext* _io_ctx;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 533aef6d86..712d43324b 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -18,6 +18,7 @@
 #include "iceberg_reader.h"
 
 #include "common/status.h"
+#include "olap/iterators.h"
 #include "vec/common/assert_cast.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -37,13 +38,15 @@ const std::string ICEBERG_FILE_PATH = "file_path";
 
 IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
                                        RuntimeState* state, const TFileScanRangeParams& params,
-                                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache)
+                                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache,
+                                       IOContext* io_ctx)
         : TableFormatReader(file_format_reader),
           _profile(profile),
           _state(state),
           _params(params),
           _range(range),
-          _kv_cache(kv_cache) {
+          _kv_cache(kv_cache),
+          _io_ctx(io_ctx) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -129,7 +132,8 @@ Status IcebergTableReader::_position_delete(
             delete_range.size = -1;
             delete_range.file_size = -1;
             ParquetReader delete_reader(_profile, _params, delete_range, 102400,
-                                        const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+                                        const_cast<cctz::time_zone*>(&_state->timezone_obj()),
+                                        _io_ctx);
             if (!init_schema) {
                 delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
                 init_schema = true;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index 301546b6df..e4757ed804 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -25,7 +25,11 @@
 #include "vec/exec/format/parquet/parquet_common.h"
 #include "vec/exprs/vexpr.h"
 
-namespace doris::vectorized {
+namespace doris {
+
+struct IOContext;
+
+namespace vectorized {
 
 class IcebergTableReader : public TableFormatReader {
 public:
@@ -36,7 +40,8 @@ public:
 
     IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params,
-                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache);
+                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache,
+                       IOContext* io_ctx);
     ~IcebergTableReader() override = default;
 
     Status init_row_filters(const TFileRangeDesc& range) override;
@@ -83,6 +88,8 @@ private:
     KVCache<std::string>& _kv_cache;
     IcebergProfile _iceberg_profile;
     std::vector<int64_t> _delete_rows;
-};
 
-} // namespace doris::vectorized
+    IOContext* _io_ctx;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 292235db9a..44d541a4a8 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -27,6 +27,7 @@
 #include "exec/arrow/orc_reader.h"
 #include "exec/text_converter.hpp"
 #include "exprs/expr_context.h"
+#include "olap/iterators.h"
 #include "runtime/descriptors.h"
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
@@ -75,6 +76,11 @@ Status VFileScanner::prepare(
     _convert_to_output_block_timer =
             ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
 
+    _file_cache_statistics.reset(new FileCacheStatistics());
+    _io_ctx.reset(new IOContext());
+    _io_ctx->file_cache_stats = _file_cache_statistics.get();
+    _io_ctx->query_id = &_state->query_id();
+
     if (vconjunct_ctx_ptr != nullptr) {
         // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
         RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
@@ -483,9 +489,9 @@ Status VFileScanner::_get_next_reader() {
         // TODO: use data lake type
         switch (_params.format_type) {
         case TFileFormatType::FORMAT_PARQUET: {
-            ParquetReader* parquet_reader =
-                    new ParquetReader(_profile, _params, range, _state->query_options().batch_size,
-                                      const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+            ParquetReader* parquet_reader = new ParquetReader(
+                    _profile, _params, range, _state->query_options().batch_size,
+                    const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get());
             if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr));
                 _discard_conjuncts();
@@ -496,7 +502,7 @@ Status VFileScanner::_get_next_reader() {
                 range.table_format_params.table_format_type == "iceberg") {
                 IcebergTableReader* iceberg_reader =
                         new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
-                                               _params, range, _kv_cache);
+                                               _params, range, _kv_cache, _io_ctx.get());
                 RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
@@ -506,8 +512,8 @@ Status VFileScanner::_get_next_reader() {
         }
         case TFileFormatType::FORMAT_ORC: {
             _cur_reader.reset(new OrcReader(_profile, _params, range, _file_col_names,
-                                            _state->query_options().batch_size,
-                                            _state->timezone()));
+                                            _state->query_options().batch_size, _state->timezone(),
+                                            _io_ctx.get()));
             init_status = ((OrcReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
             break;
         }
@@ -518,14 +524,14 @@ Status VFileScanner::_get_next_reader() {
         case TFileFormatType::FORMAT_CSV_LZOP:
         case TFileFormatType::FORMAT_CSV_DEFLATE:
         case TFileFormatType::FORMAT_PROTO: {
-            _cur_reader.reset(
-                    new CsvReader(_state, _profile, &_counter, _params, range, _file_slot_descs));
+            _cur_reader.reset(new CsvReader(_state, _profile, &_counter, _params, range,
+                                            _file_slot_descs, _io_ctx.get()));
             init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
             break;
         }
         case TFileFormatType::FORMAT_JSON: {
             _cur_reader.reset(new NewJsonReader(_state, _profile, &_counter, _params, range,
-                                                _file_slot_descs, &_scanner_eof));
+                                                _file_slot_descs, &_scanner_eof, _io_ctx.get()));
             init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader();
             break;
         }
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index 28c0e3d347..4e71f00ed4 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -124,6 +124,9 @@ protected:
 
     VExprContext* _push_down_expr = nullptr;
 
+    std::unique_ptr<FileCacheStatistics> _file_cache_statistics;
+    std::unique_ptr<IOContext> _io_ctx;
+
 private:
     RuntimeProfile::Counter* _get_block_timer = nullptr;
     RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 32ce808020..6c59e91d46 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -86,6 +86,7 @@ set(HTTP_TEST_FILES
 )
 set(IO_TEST_FILES
     io/cache/remote_file_cache_test.cpp
+    io/cache/file_block_cache_test.cpp
 )
 set(OLAP_TEST_FILES
     olap/engine_storage_migration_task_test.cpp
diff --git a/be/test/io/cache/file_block_cache_test.cpp b/be/test/io/cache/file_block_cache_test.cpp
new file mode 100644
index 0000000000..d24bcc8732
--- /dev/null
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
+// and modified by Doris
+
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <thread>
+
+#include "common/config.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_settings.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/cache/block/block_lru_file_cache.h"
+#include "olap/options.h"
+#include "util/slice.h"
+
+namespace doris::io {
+
+namespace fs = std::filesystem;
+
+fs::path caches_dir = fs::current_path() / "lru_cache_test";
+std::string cache_base_path = caches_dir / "cache1" / "";
+
+void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_segment,
+                  const io::FileBlock::Range& expected_range, io::FileBlock::State expected_state) {
+    auto range = file_segment->range();
+
+    ASSERT_EQ(range.left, expected_range.left);
+    ASSERT_EQ(range.right, expected_range.right);
+    ASSERT_EQ(file_segment->state(), expected_state);
+}
+
+std::vector<io::FileBlockSPtr> fromHolder(const io::FileBlocksHolder& holder) {
+    return std::vector<io::FileBlockSPtr>(holder.file_segments.begin(), holder.file_segments.end());
+}
+
+std::string getFileBlockPath(const std::string& base_path, const io::IFileCache::Key& key,
+                             size_t offset) {
+    auto key_str = key.to_string();
+    return fs::path(base_path) / key_str / std::to_string(offset);
+}
+
+void download(io::FileBlockSPtr file_segment) {
+    const auto& key = file_segment->key();
+    size_t size = file_segment->range().size();
+
+    auto key_str = key.to_string();
+    auto subdir = fs::path(cache_base_path) / key_str;
+    ASSERT_TRUE(fs::exists(subdir));
+
+    std::string data(size, '0');
+    Slice result(data.data(), size);
+    file_segment->append(result);
+    file_segment->finalize_write();
+}
+
+void complete(const io::FileBlocksHolder& holder) {
+    for (const auto& file_segment : holder.file_segments) {
+        ASSERT_TRUE(file_segment->get_or_set_downloader() == io::FileBlock::get_caller_id());
+        download(file_segment);
+    }
+}
+
+TEST(LRUFileCache, init) {
+    std::string string = std::string(R"(
+        [
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "normal" : 193273528320,
+            "persistent" : 193273528320,
+            "query_limit" : 38654705664
+        },
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "normal" : 193273528320,
+            "persistent" : 193273528320,
+            "query_limit" : 38654705664
+        }
+        ]
+        )");
+    config::enable_file_cache_query_limit = true;
+    std::vector<CachePath> cache_paths;
+    parse_conf_cache_paths(string, cache_paths);
+    EXPECT_EQ(cache_paths.size(), 2);
+    for (const auto& cache_path : cache_paths) {
+        io::FileCacheSettings settings = cache_path.init_settings();
+        EXPECT_EQ(settings.max_size, 193273528320);
+        EXPECT_EQ(settings.persistent_max_size, 193273528320);
+        EXPECT_EQ(settings.max_query_cache_size, 38654705664);
+    }
+}
+
+void test_file_cache(bool is_persistent) {
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+
+    TUniqueId other_query_id;
+    other_query_id.hi = 2;
+    other_query_id.lo = 2;
+
+    io::FileCacheSettings settings;
+    settings.max_size = 30;
+    settings.max_elements = 5;
+    settings.persistent_max_size = 30;
+    settings.persistent_max_elements = 5;
+    settings.max_file_segment_size = 100;
+    auto key = io::IFileCache::hash("key1");
+    {
+        io::LRUFileCache cache(cache_base_path, settings);
+        cache.initialize();
+        {
+            auto holder =
+                    cache.get_or_set(key, 0, 10, is_persistent, query_id); /// Add range [0, 9]
+            auto segments = fromHolder(holder);
+            /// Range was not present in cache. It should be added in cache as one while file segment.
+            ASSERT_GE(segments.size(), 1);
+
+            assert_range(1, segments[0], io::FileBlock::Range(0, 9), io::FileBlock::State::EMPTY);
+
+            /// Exception because space not reserved.
+            /// EXPECT_THROW(download(segments[0]), DB::Exception);
+            /// Exception because space can be reserved only by downloader
+            /// EXPECT_THROW(segments[0]->reserve(segments[0]->range().size()), DB::Exception);
+            ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            assert_range(2, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADING);
+
+            download(segments[0]);
+            assert_range(3, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        /// Current cache:    [__________]
+        ///                   ^          ^
+        ///                   0          9
+        ASSERT_EQ(cache.get_file_segments_num(is_persistent), 1);
+        ASSERT_EQ(cache.get_used_cache_size(is_persistent), 10);
+
+        {
+            /// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache.
+            auto holder = cache.get_or_set(key, 5, 10, is_persistent, query_id);
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 2);
+
+            assert_range(4, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(5, segments[1], io::FileBlock::Range(10, 14), io::FileBlock::State::EMPTY);
+
+            ASSERT_TRUE(segments[1]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            download(segments[1]);
+            assert_range(6, segments[1], io::FileBlock::Range(10, 14),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        /// Current cache:    [__________][_____]
+        ///                   ^          ^^     ^
+        ///                   0          910    14
+        ASSERT_EQ(cache.get_file_segments_num(is_persistent), 2);
+        ASSERT_EQ(cache.get_used_cache_size(is_persistent), 15);
+
+        {
+            auto holder = cache.get_or_set(key, 9, 1, is_persistent, query_id); /// Get [9, 9]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 1);
+            assert_range(7, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        {
+            auto holder = cache.get_or_set(key, 9, 2, is_persistent, query_id); /// Get [9, 10]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 2);
+            assert_range(8, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(9, segments[1], io::FileBlock::Range(10, 14),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        {
+            auto holder = cache.get_or_set(key, 10, 1, is_persistent, query_id); /// Get [10, 10]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 1);
+            assert_range(10, segments[0], io::FileBlock::Range(10, 14),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        complete(cache.get_or_set(key, 17, 4, is_persistent, query_id)); /// Get [17, 20]
+        complete(cache.get_or_set(key, 24, 3, is_persistent, query_id)); /// Get [24, 26]
+
+        /// Current cache:    [__________][_____]   [____]    [___]
+        ///                   ^          ^^     ^   ^    ^    ^   ^
+        ///                   0          910    14  17   20   24  26
+        ///
+        ASSERT_EQ(cache.get_file_segments_num(is_persistent), 4);
+        ASSERT_EQ(cache.get_used_cache_size(is_persistent), 22);
+
+        {
+            auto holder = cache.get_or_set(key, 0, 26, is_persistent, query_id); /// Get [0, 25]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 6);
+
+            assert_range(11, segments[0], io::FileBlock::Range(0, 9),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(12, segments[1], io::FileBlock::Range(10, 14),
+                         io::FileBlock::State::DOWNLOADED);
+
+            /// Missing [15, 16] should be added in cache.
+            assert_range(13, segments[2], io::FileBlock::Range(15, 16),
+                         io::FileBlock::State::EMPTY);
+
+            ASSERT_TRUE(segments[2]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            download(segments[2]);
+
+            assert_range(14, segments[3], io::FileBlock::Range(17, 20),
+                         io::FileBlock::State::DOWNLOADED);
+
+            /// New [21, 23], but will not be added in cache because of elements limit (5)
+            assert_range(15, segments[4], io::FileBlock::Range(21, 23),
+                         io::FileBlock::State::SKIP_CACHE);
+
+            assert_range(16, segments[5], io::FileBlock::Range(24, 26),
+                         io::FileBlock::State::DOWNLOADED);
+
+            /// Current cache:    [__________][_____][   ][____]    [___]
+            ///                   ^                            ^    ^
+            ///                   0                            20   24
+            ///
+
+            /// Range [27, 27] must be evicted in previous getOrSet [0, 25].
+            /// Let's not invalidate pointers to returned segments from range [0, 25] and
+            /// as max elements size is reached, next attempt to put something in cache should fail.
+            /// This will also check that [27, 27] was indeed evicted.
+
+            auto holder1 = cache.get_or_set(key, 27, 1, is_persistent, query_id);
+            auto segments_1 = fromHolder(holder1); /// Get [27, 27]
+            ASSERT_EQ(segments_1.size(), 1);
+            assert_range(17, segments_1[0], io::FileBlock::Range(27, 27),
+                         io::FileBlock::State::SKIP_CACHE);
+        }
+
+        {
+            auto holder = cache.get_or_set(key, 12, 10, is_persistent, query_id); /// Get [12, 21]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 4);
+
+            assert_range(18, segments[0], io::FileBlock::Range(10, 14),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(19, segments[1], io::FileBlock::Range(15, 16),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(20, segments[2], io::FileBlock::Range(17, 20),
+                         io::FileBlock::State::DOWNLOADED);
+
+            assert_range(21, segments[3], io::FileBlock::Range(21, 21),
+                         io::FileBlock::State::EMPTY);
+
+            ASSERT_TRUE(segments[3]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            download(segments[3]);
+            ASSERT_TRUE(segments[3]->state() == io::FileBlock::State::DOWNLOADED);
+        }
+
+        /// Current cache:    [_____][__][____][_]   [___]
+        ///                   ^          ^       ^   ^   ^
+        ///                   10         17      21  24  26
+
+        ASSERT_EQ(cache.get_file_segments_num(is_persistent), 5);
+
+        {
+            auto holder = cache.get_or_set(key, 23, 5, is_persistent, query_id); /// Get [23, 28]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 3);
+
+            assert_range(22, segments[0], io::FileBlock::Range(23, 23),
+                         io::FileBlock::State::EMPTY);
+            assert_range(23, segments[1], io::FileBlock::Range(24, 26),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(24, segments[2], io::FileBlock::Range(27, 27),
+                         io::FileBlock::State::EMPTY);
+
+            ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            ASSERT_TRUE(segments[2]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            download(segments[0]);
+            download(segments[2]);
+        }
+
+        /// Current cache:    [____][_]  [][___][__]
+        ///                   ^       ^  ^^^   ^^  ^
+        ///                   17      21 2324  26  27
+
+        {
+            auto holder5 = cache.get_or_set(key, 2, 3, is_persistent, query_id); /// Get [2, 4]
+            auto s5 = fromHolder(holder5);
+            ASSERT_EQ(s5.size(), 1);
+            assert_range(25, s5[0], io::FileBlock::Range(2, 4), io::FileBlock::State::EMPTY);
+
+            auto holder1 = cache.get_or_set(key, 30, 2, is_persistent, query_id); /// Get [30, 31]
+            auto s1 = fromHolder(holder1);
+            ASSERT_EQ(s1.size(), 1);
+            assert_range(26, s1[0], io::FileBlock::Range(30, 31), io::FileBlock::State::EMPTY);
+
+            ASSERT_TRUE(s5[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            ASSERT_TRUE(s1[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            download(s5[0]);
+            download(s1[0]);
+
+            /// Current cache:    [___]       [_][___][_]   [__]
+            ///                   ^   ^       ^  ^   ^  ^   ^  ^
+            ///                   2   4       23 24  26 27  30 31
+
+            auto holder2 = cache.get_or_set(key, 23, 1, is_persistent, query_id); /// Get [23, 23]
+            auto s2 = fromHolder(holder2);
+            ASSERT_EQ(s2.size(), 1);
+
+            auto holder3 = cache.get_or_set(key, 24, 3, is_persistent, query_id); /// Get [24, 26]
+            auto s3 = fromHolder(holder3);
+            ASSERT_EQ(s3.size(), 1);
+
+            auto holder4 = cache.get_or_set(key, 27, 1, is_persistent, query_id); /// Get [27, 27]
+            auto s4 = fromHolder(holder4);
+            ASSERT_EQ(s4.size(), 1);
+
+            /// All cache is now unreleasable because pointers are still hold
+            auto holder6 = cache.get_or_set(key, 0, 40, is_persistent, query_id);
+            auto f = fromHolder(holder6);
+            ASSERT_EQ(f.size(), 9);
+
+            assert_range(27, f[0], io::FileBlock::Range(0, 1), io::FileBlock::State::SKIP_CACHE);
+            assert_range(28, f[2], io::FileBlock::Range(5, 22), io::FileBlock::State::SKIP_CACHE);
+            assert_range(29, f[6], io::FileBlock::Range(28, 29), io::FileBlock::State::SKIP_CACHE);
+            assert_range(30, f[8], io::FileBlock::Range(32, 39), io::FileBlock::State::SKIP_CACHE);
+        }
+
+        {
+            auto holder = cache.get_or_set(key, 2, 3, is_persistent, query_id); /// Get [2, 4]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 1);
+            assert_range(31, segments[0], io::FileBlock::Range(2, 4),
+                         io::FileBlock::State::DOWNLOADED);
+        }
+
+        /// Current cache:    [___]       [_][___][_]   [__]
+        ///                   ^   ^       ^  ^   ^  ^   ^  ^
+        ///                   2   4       23 24  26 27  30 31
+
+        {
+            auto holder = cache.get_or_set(key, 25, 5, is_persistent, query_id); /// Get [25, 29]
+            auto segments = fromHolder(holder);
+            ASSERT_EQ(segments.size(), 3);
+
+            assert_range(32, segments[0], io::FileBlock::Range(24, 26),
+                         io::FileBlock::State::DOWNLOADED);
+            assert_range(33, segments[1], io::FileBlock::Range(27, 27),
+                         io::FileBlock::State::DOWNLOADED);
+
+            assert_range(34, segments[2], io::FileBlock::Range(28, 29),
+                         io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(segments[2]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            ASSERT_TRUE(segments[2]->state() == io::FileBlock::State::DOWNLOADING);
+
+            bool lets_start_download = false;
+            std::mutex mutex;
+            std::condition_variable cv;
+
+            std::thread other_1([&] {
+                auto holder_2 = cache.get_or_set(key, 25, 5, is_persistent,
+                                                 other_query_id); /// Get [25, 29] once again.
+                auto segments_2 = fromHolder(holder_2);
+                ASSERT_EQ(segments.size(), 3);
+
+                assert_range(35, segments_2[0], io::FileBlock::Range(24, 26),
+                             io::FileBlock::State::DOWNLOADED);
+                assert_range(36, segments_2[1], io::FileBlock::Range(27, 27),
+                             io::FileBlock::State::DOWNLOADED);
+                assert_range(37, segments_2[2], io::FileBlock::Range(28, 29),
+                             io::FileBlock::State::DOWNLOADING);
+
+                ASSERT_TRUE(segments[2]->get_or_set_downloader() != io::FileBlock::get_caller_id());
+                ASSERT_TRUE(segments[2]->state() == io::FileBlock::State::DOWNLOADING);
+
+                {
+                    std::lock_guard lock(mutex);
+                    lets_start_download = true;
+                }
+                cv.notify_one();
+
+                while (segments_2[2]->wait() == io::FileBlock::State::DOWNLOADING) {
+                }
+                ASSERT_TRUE(segments_2[2]->state() == io::FileBlock::State::DOWNLOADED);
+            });
+
+            {
+                std::unique_lock lock(mutex);
+                cv.wait(lock, [&] { return lets_start_download; });
+            }
+
+            download(segments[2]);
+            ASSERT_TRUE(segments[2]->state() == io::FileBlock::State::DOWNLOADED);
+
+            other_1.join();
+        }
+
+        /// Current cache:    [___]       [___][_][__][__]
+        ///                   ^   ^       ^   ^  ^^  ^^  ^
+        ///                   2   4       24  26 27  2930 31
+
+        {
+            /// Now let's check the similar case but getting ERROR state after segment->wait(), when
+            /// state is changed not manually via segment->complete(state) but from destructor of holder
+            /// and notify_all() is also called from destructor of holder.
+
+            std::optional<io::FileBlocksHolder> holder;
+            holder.emplace(cache.get_or_set(key, 3, 23, is_persistent, query_id)); /// Get [3, 25]
+
+            auto segments = fromHolder(*holder);
+            ASSERT_EQ(segments.size(), 3);
+
+            assert_range(38, segments[0], io::FileBlock::Range(2, 4),
+                         io::FileBlock::State::DOWNLOADED);
+
+            assert_range(39, segments[1], io::FileBlock::Range(5, 23), io::FileBlock::State::EMPTY);
+            ASSERT_TRUE(segments[1]->get_or_set_downloader() == io::FileBlock::get_caller_id());
+            ASSERT_TRUE(segments[1]->state() == io::FileBlock::State::DOWNLOADING);
+
+            assert_range(40, segments[2], io::FileBlock::Range(24, 26),
+                         io::FileBlock::State::DOWNLOADED);
+
+            bool lets_start_download = false;
+            std::mutex mutex;
+            std::condition_variable cv;
+
+            std::thread other_1([&] {
+                auto holder_2 = cache.get_or_set(key, 3, 23, is_persistent,
+                                                 other_query_id); /// Get [3, 25] once again
+                auto segments_2 = fromHolder(*holder);
+                ASSERT_EQ(segments_2.size(), 3);
+
+                assert_range(41, segments_2[0], io::FileBlock::Range(2, 4),
+                             io::FileBlock::State::DOWNLOADED);
+                assert_range(42, segments_2[1], io::FileBlock::Range(5, 23),
+                             io::FileBlock::State::DOWNLOADING);
+                assert_range(43, segments_2[2], io::FileBlock::Range(24, 26),
+                             io::FileBlock::State::DOWNLOADED);
+
+                ASSERT_TRUE(segments_2[1]->get_downloader() != io::FileBlock::get_caller_id());
+                ASSERT_TRUE(segments_2[1]->state() == io::FileBlock::State::DOWNLOADING);
+
+                {
+                    std::lock_guard lock(mutex);
+                    lets_start_download = true;
+                }
+                cv.notify_one();
+
+                while (segments_2[1]->wait() == io::FileBlock::State::DOWNLOADING) {
+                }
+                ASSERT_TRUE(segments_2[1]->state() == io::FileBlock::State::EMPTY);
+                ASSERT_TRUE(segments_2[1]->get_or_set_downloader() ==
+                            io::FileBlock::get_caller_id());
+                download(segments_2[1]);
+            });
+
+            {
+                std::unique_lock lock(mutex);
+                cv.wait(lock, [&] { return lets_start_download; });
+            }
+
+            holder.reset();
+            other_1.join();
+            ASSERT_TRUE(segments[1]->state() == io::FileBlock::State::DOWNLOADED);
+        }
+    }
+    /// Current cache:    [___][        ][___][_][__]
+    ///                   ^   ^^         ^   ^^  ^  ^
+    ///                   2   45       24  2627 28 29
+
+    {
+        /// Test LRUCache::restore().
+
+        io::LRUFileCache cache2(cache_base_path, settings);
+        cache2.initialize();
+        auto holder1 = cache2.get_or_set(key, 2, 28, is_persistent, query_id); /// Get [2, 29]
+
+        auto segments1 = fromHolder(holder1);
+        ASSERT_EQ(segments1.size(), 5);
+
+        assert_range(44, segments1[0], io::FileBlock::Range(2, 4),
+                     io::FileBlock::State::DOWNLOADED);
+        assert_range(45, segments1[1], io::FileBlock::Range(5, 23),
+                     io::FileBlock::State::DOWNLOADED);
+        assert_range(45, segments1[2], io::FileBlock::Range(24, 26),
+                     io::FileBlock::State::DOWNLOADED);
+        assert_range(46, segments1[3], io::FileBlock::Range(27, 27),
+                     io::FileBlock::State::DOWNLOADED);
+        assert_range(47, segments1[4], io::FileBlock::Range(28, 29),
+                     io::FileBlock::State::DOWNLOADED);
+    }
+
+    {
+        /// Test max file segment size
+
+        auto settings2 = settings;
+        settings2.max_size = 30;
+        settings2.max_elements = 5;
+        settings2.persistent_max_size = 30;
+        settings2.persistent_max_elements = 5;
+        settings2.max_file_segment_size = 10;
+        io::LRUFileCache cache2(caches_dir / "cache2", settings2);
+
+        auto holder1 = cache2.get_or_set(key, 0, 25, is_persistent, query_id); /// Get [0, 24]
+        auto segments1 = fromHolder(holder1);
+
+        ASSERT_EQ(segments1.size(), 3);
+        assert_range(48, segments1[0], io::FileBlock::Range(0, 9), io::FileBlock::State::EMPTY);
+        assert_range(49, segments1[1], io::FileBlock::Range(10, 19), io::FileBlock::State::EMPTY);
+        assert_range(50, segments1[2], io::FileBlock::Range(20, 24), io::FileBlock::State::EMPTY);
+    }
+}
+
+TEST(LRUFileCache, normal) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    test_file_cache(false);
+    test_file_cache(true);
+}
+
+} // namespace doris::io
diff --git a/be/test/olap/primary_key_index_test.cpp b/be/test/olap/primary_key_index_test.cpp
index aca1dc575e..de3c75ae18 100644
--- a/be/test/olap/primary_key_index_test.cpp
+++ b/be/test/olap/primary_key_index_test.cpp
@@ -76,7 +76,7 @@ TEST_F(PrimaryKeyIndexTest, builder) {
     FilePathDesc path_desc(filename);
     PrimaryKeyIndexReader index_reader;
     io::FileReaderSPtr file_reader;
-    EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
+    EXPECT_TRUE(fs->open_file(filename, &file_reader, nullptr).ok());
     EXPECT_TRUE(index_reader.parse_index(file_reader, index_meta).ok());
     EXPECT_TRUE(index_reader.parse_bf(file_reader, index_meta).ok());
     EXPECT_EQ(num_rows, index_reader.num_rows());
diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
index fc1e9cd62f..74a5e77bdf 100644
--- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
@@ -80,7 +80,8 @@ template <FieldType type>
 void get_bitmap_reader_iter(const std::string& file_name, const ColumnIndexMetaPB& meta,
                             BitmapIndexReader** reader, BitmapIndexIterator** iter) {
     io::FileReaderSPtr file_reader;
-    ASSERT_EQ(io::global_local_filesystem()->open_file(file_name, &file_reader), Status::OK());
+    ASSERT_EQ(io::global_local_filesystem()->open_file(file_name, &file_reader, nullptr),
+              Status::OK());
     *reader = new BitmapIndexReader(std::move(file_reader), &meta.bitmap_index());
     auto st = (*reader)->load(true, false);
     EXPECT_TRUE(st.ok());
diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
index 661eb497eb..30061af1d4 100644
--- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
@@ -91,7 +91,7 @@ void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnInde
                                   std::unique_ptr<BloomFilterIndexIterator>* iter) {
     std::string fname = dname + "/" + file_name;
     io::FileReaderSPtr file_reader;
-    ASSERT_EQ(io::global_local_filesystem()->open_file(fname, &file_reader), Status::OK());
+    ASSERT_EQ(io::global_local_filesystem()->open_file(fname, &file_reader, nullptr), Status::OK());
     *reader = new BloomFilterIndexReader(std::move(file_reader), &meta.bloom_filter_index());
     auto st = (*reader)->load(true, false);
     EXPECT_TRUE(st.ok());
diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
index a0dd6c7ee5..7eec4b90ed 100644
--- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
@@ -129,7 +129,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows,
     }
     auto type_info = get_scalar_type_info(type);
     io::FileReaderSPtr file_reader;
-    ASSERT_EQ(fs->open_file(fname, &file_reader), Status::OK());
+    ASSERT_EQ(fs->open_file(fname, &file_reader, nullptr), Status::OK());
     // read and check
     {
         // sequence read
@@ -305,7 +305,7 @@ void test_array_nullable_data(CollectionValue* src_data, uint8_t* src_is_null, i
     }
     auto type_info = get_type_info(&meta);
     io::FileReaderSPtr file_reader;
-    ASSERT_EQ(fs->open_file(fname, &file_reader), Status::OK());
+    ASSERT_EQ(fs->open_file(fname, &file_reader, nullptr), Status::OK());
     // read and check
     {
         ColumnReaderOptions reader_opts;
diff --git a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
index cb7d730973..03a23ef860 100644
--- a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
@@ -77,7 +77,7 @@ TEST_F(OrdinalPageIndexTest, normal) {
     }
 
     io::FileReaderSPtr file_reader;
-    EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
+    EXPECT_TRUE(fs->open_file(filename, &file_reader, nullptr).ok());
     OrdinalIndexReader index(file_reader, &index_meta.ordinal_index(), 16 * 1024 * 4096 + 1);
     EXPECT_TRUE(index.load(true, false).ok());
     EXPECT_EQ(16 * 1024, index.num_data_pages());
diff --git a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
index f7607d8ca9..8df497870d 100644
--- a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
@@ -84,7 +84,7 @@ public:
         }
 
         io::FileReaderSPtr file_reader;
-        EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
+        EXPECT_TRUE(fs->open_file(filename, &file_reader, nullptr).ok());
         ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index());
         Status status = column_zone_map.load(true, false);
         EXPECT_TRUE(status.ok());
@@ -131,7 +131,7 @@ public:
         }
 
         io::FileReaderSPtr file_reader;
-        EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
+        EXPECT_TRUE(fs->open_file(filename, &file_reader, nullptr).ok());
         ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index());
         Status status = column_zone_map.load(true, false);
         EXPECT_TRUE(status.ok());
@@ -184,7 +184,7 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) {
     }
 
     io::FileReaderSPtr file_reader;
-    EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
+    EXPECT_TRUE(fs->open_file(filename, &file_reader, nullptr).ok());
     ZoneMapIndexReader column_zone_map(file_reader, &index_meta.zone_map_index());
     Status status = column_zone_map.load(true, false);
     EXPECT_TRUE(status.ok());
diff --git a/be/test/runtime/array_test.cpp b/be/test/runtime/array_test.cpp
index 8953578ef5..87f219ade7 100644
--- a/be/test/runtime/array_test.cpp
+++ b/be/test/runtime/array_test.cpp
@@ -372,7 +372,7 @@ private:
 
     io::FileReaderSPtr create_readable_block(const std::string& path) {
         io::FileReaderSPtr reader;
-        auto st = io::global_local_filesystem()->open_file(path, &reader);
+        auto st = io::global_local_filesystem()->open_file(path, &reader, nullptr);
         return st.ok() ? std::move(reader) : nullptr;
     }
 
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 319feb0b8c..3046e48dc5 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -91,7 +91,8 @@ TEST_F(ParquetReaderTest, normal) {
     auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
     io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
     io::FileReaderSPtr reader;
-    local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", &reader);
+    local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", &reader,
+                        nullptr);
 
     cctz::time_zone ctz;
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
@@ -106,7 +107,7 @@ TEST_F(ParquetReaderTest, normal) {
         scan_range.start_offset = 0;
         scan_range.size = 1000;
     }
-    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz);
+    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr);
     p_reader->set_file_reader(reader);
     RuntimeState runtime_state((TQueryGlobals()));
     runtime_state.set_desc_tbl(desc_tbl);
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index f0712cf7c5..9bc9eb97c0 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -50,7 +50,7 @@ TEST_F(ParquetThriftReaderTest, normal) {
     io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
     io::FileReaderSPtr reader;
     auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/localfile.parquet",
-                                  &reader);
+                                  &reader, nullptr);
     EXPECT_TRUE(st.ok());
 
     std::shared_ptr<FileMetaData> meta_data;
@@ -82,7 +82,7 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
     io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
     io::FileReaderSPtr reader;
     auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet",
-                                  &reader);
+                                  &reader, nullptr);
     EXPECT_TRUE(st.ok());
 
     std::shared_ptr<FileMetaData> metadata;
@@ -285,7 +285,7 @@ static void read_parquet_data_and_check(const std::string& parquet_file,
 
     io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
     io::FileReaderSPtr reader;
-    auto st = local_fs->open_file(parquet_file, &reader);
+    auto st = local_fs->open_file(parquet_file, &reader, nullptr);
     EXPECT_TRUE(st.ok());
 
     std::unique_ptr<vectorized::Block> block;
@@ -325,7 +325,7 @@ static void read_parquet_data_and_check(const std::string& parquet_file,
     }
 
     io::FileReaderSPtr result;
-    auto rst = local_fs->open_file(result_file, &result);
+    auto rst = local_fs->open_file(result_file, &result, nullptr);
     EXPECT_TRUE(rst.ok());
     uint8_t result_buf[result->size() + 1];
     result_buf[result->size()] = '\0';
@@ -408,7 +408,7 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
     io::FileReaderSPtr file_reader;
     auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
-                                  &file_reader);
+                                  &file_reader, nullptr);
     EXPECT_TRUE(st.ok());
 
     // prepare metadata
@@ -445,7 +445,7 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
 
     io::FileReaderSPtr result;
     auto rst = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/group-reader.txt",
-                                   &result);
+                                   &result, nullptr);
     EXPECT_TRUE(rst.ok());
     uint8_t result_buf[result->size() + 1];
     result_buf[result->size()] = '\0';


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org