You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:12:57 UTC

[doris] 03/13: [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 053c8c303f7047e67677eae1c0cc138f54913fb9
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Wed Jun 7 21:35:15 2023 +0800

    [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)
---
 be/src/agent/task_worker_pool.cpp                  |  28 ++-
 be/src/runtime/snapshot_loader.cpp                 | 269 +++++++++++++++++++++
 be/src/runtime/snapshot_loader.h                   |   5 +
 be/src/service/backend_service.cpp                 |  14 +-
 .../org/apache/doris/analysis/RestoreStmt.java     |  32 ++-
 .../apache/doris/analysis/ShowSnapshotStmt.java    |  33 ++-
 .../org/apache/doris/backup/BackupHandler.java     | 132 +++++++---
 .../java/org/apache/doris/backup/BackupJob.java    |  25 +-
 .../org/apache/doris/backup/BackupJobInfo.java     |  93 ++++++-
 .../java/org/apache/doris/backup/BackupMeta.java   |  14 ++
 .../java/org/apache/doris/backup/Repository.java   |   2 +
 .../java/org/apache/doris/backup/RestoreJob.java   | 206 +++++++++++++++-
 .../java/org/apache/doris/backup/Snapshot.java     |  69 ++++++
 .../apache/doris/service/FrontendServiceImpl.java  | 194 ++++++++++++++-
 .../java/org/apache/doris/task/DownloadTask.java   |  36 ++-
 gensrc/thrift/AgentService.thrift                  |  11 +
 gensrc/thrift/FrontendService.thrift               |  48 ++++
 gensrc/thrift/Status.thrift                        |   3 +
 18 files changed, 1147 insertions(+), 67 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index f596603c35..d830d592e2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -761,20 +761,28 @@ void TaskWorkerPool::_download_worker_thread_callback() {
             _tasks.pop_front();
         }
         LOG(INFO) << "get download task. signature=" << agent_task_req.signature
-                  << ", job_id=" << download_request.job_id;
+                  << ", job_id=" << download_request.job_id
+                  << "task detail: " << apache::thrift::ThriftDebugString(download_request);
 
         // TODO: download
         std::vector<int64_t> downloaded_tablet_ids;
 
-        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
-                _env, download_request.job_id, agent_task_req.signature,
-                download_request.broker_addr, download_request.broker_prop);
-        Status status = loader->init(
-                download_request.__isset.storage_backend ? download_request.storage_backend
-                                                         : TStorageBackendType::type::BROKER,
-                download_request.__isset.location ? download_request.location : "");
-        if (status.ok()) {
-            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+        auto status = Status::OK();
+        if (download_request.__isset.remote_tablet_snapshots) {
+            SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature);
+            loader.remote_http_download(download_request.remote_tablet_snapshots,
+                                        &downloaded_tablet_ids);
+        } else {
+            std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
+                    _env, download_request.job_id, agent_task_req.signature,
+                    download_request.broker_addr, download_request.broker_prop);
+            status = loader->init(
+                    download_request.__isset.storage_backend ? download_request.storage_backend
+                                                             : TStorageBackendType::type::BROKER,
+                    download_request.__isset.location ? download_request.location : "");
+            if (status.ok()) {
+                status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+            }
         }
 
         if (!status.ok()) {
diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp
index 4db803b10e..f1b58fa454 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -34,9 +34,12 @@
 #include <cstring>
 #include <filesystem>
 #include <istream>
+#include <unordered_map>
 #include <utility>
 
 #include "common/logging.h"
+#include "gutil/strings/split.h"
+#include "http/http_client.h"
 #include "io/fs/broker_file_system.h"
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
@@ -370,6 +373,272 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
     return status;
 }
 
+Status SnapshotLoader::remote_http_download(
+        const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
+        std::vector<int64_t>* downloaded_tablet_ids) {
+    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
+                             _task_id);
+    constexpr uint32_t kListRemoteFileTimeout = 15;
+    constexpr uint32_t kDownloadFileMaxRetry = 3;
+    constexpr uint32_t kGetLengthTimeout = 10;
+
+    // check if job has already been cancelled
+    int tmp_counter = 1;
+    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
+    Status status = Status::OK();
+
+    // Step before, validate all remote
+
+    // Step 1: Validate local tablet snapshot paths
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        auto& path = remote_tablet_snapshot.local_snapshot_path;
+        bool res = true;
+        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
+        if (!res) {
+            std::stringstream ss;
+            auto err_msg =
+                    fmt::format("snapshot path is not directory or does not exist: {}", path);
+            LOG(WARNING) << err_msg;
+            return Status::RuntimeError(err_msg);
+        }
+    }
+
+    // Step 2: get all local files
+    struct LocalFileStat {
+        uint64_t size;
+        // TODO(Drogon): add md5sum
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        std::vector<std::string> local_files;
+        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
+
+        auto& local_filestat = local_files_map[local_path];
+        for (auto& local_file : local_files) {
+            // add file size
+            std::string local_file_path = local_path + "/" + local_file;
+            std::error_code ec;
+            uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
+                                       ec.message());
+            }
+            local_filestat[local_file] = {local_file_size};
+        }
+    }
+
+    // Step 3: Validate remote tablet snapshot paths && remote files map
+    // TODO(Drogon): Add md5sum check
+    // key is remote snapshot paths, value is filelist
+    // get all these use http download action
+    // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+    int report_counter = 0;
+    int total_num = remote_tablet_snapshots.size();
+    int finished_num = 0;
+    struct RemoteFileStat {
+        // TODO(Drogon): Add md5sum
+        std::string url;
+        uint64_t size;
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
+            remote_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        const auto& token = remote_tablet_snapshot.remote_token;
+        const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+        // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+        std::string remote_url_prefix =
+                fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
+                            remote_be_addr.hostname, remote_be_addr.port, token, remote_path);
+
+        string file_list_str;
+        auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
+            RETURN_IF_ERROR(client->init(remote_url_prefix));
+            client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+            return client->execute(&file_list_str);
+        };
+        RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
+        std::vector<string> filename_list =
+                strings::Split(file_list_str, "\n", strings::SkipWhitespace());
+
+        for (const auto& filename : filename_list) {
+            std::string remote_file_url = fmt::format(
+                    "http://{}:{}/api/_tablet/_download?token={}&file={}/{}",
+                    remote_tablet_snapshot.remote_be_addr.hostname,
+                    remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
+                    remote_tablet_snapshot.remote_snapshot_path, filename);
+
+            // get file length
+            uint64_t file_size = 0;
+            auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(kGetLengthTimeout * 1000);
+                RETURN_IF_ERROR(client->head());
+                RETURN_IF_ERROR(client->get_content_length(&file_size));
+                return Status::OK();
+            };
+            RETURN_IF_ERROR(
+                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb));
+
+            remote_files[filename] = RemoteFileStat {remote_file_url, file_size};
+        }
+    }
+
+    // Step 4: Compare local and remote files && get all need download files
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
+                                      TTaskType::type::DOWNLOAD));
+
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        auto& local_files = local_files_map[local_path];
+        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
+
+        // get all need download files
+        std::vector<std::string> need_download_files;
+        for (const auto& [remote_file, remote_filestat] : remote_files) {
+            LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
+                                     remote_filestat.size);
+            auto it = local_files.find(remote_file);
+            if (it == local_files.end()) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            if (_end_with(remote_file, ".hdr")) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+
+            if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            // TODO(Drogon): check by md5sum, if not match then download
+
+            LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
+        }
+
+        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
+        TabletSharedPtr tablet =
+                _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get local tablet: " << local_tablet_id;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        DataDir* data_dir = tablet->data_dir();
+
+        // download all need download files
+        uint64_t total_file_size = 0;
+        MonotonicStopWatch watch;
+        watch.start();
+        for (auto& filename : need_download_files) {
+            auto& remote_filestat = remote_files[filename];
+            auto file_size = remote_filestat.size;
+            auto& remote_file_url = remote_filestat.url;
+
+            // check disk capacity
+            if (data_dir->reach_capacity_limit(file_size)) {
+                return Status::InternalError("Disk reach capacity limit");
+            }
+
+            total_file_size += file_size;
+            uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
+            if (estimate_timeout < config::download_low_speed_time) {
+                estimate_timeout = config::download_low_speed_time;
+            }
+
+            std::string local_filename;
+            RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename));
+            std::string local_file_path = local_path + "/" + local_filename;
+
+            LOG(INFO) << "clone begin to download file from: " << remote_file_url
+                      << " to: " << local_file_path << ". size(B): " << file_size
+                      << ", timeout(s): " << estimate_timeout;
+
+            auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
+                                file_size](HttpClient* client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(estimate_timeout * 1000);
+                RETURN_IF_ERROR(client->download(local_file_path));
+
+                std::error_code ec;
+                // Check file length
+                uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+                if (ec) {
+                    LOG(WARNING) << "download file error" << ec.message();
+                    return Status::IOError("can't retrive file_size of {}, due to {}",
+                                           local_file_path, ec.message());
+                }
+                if (local_file_size != file_size) {
+                    LOG(WARNING) << "download file length error"
+                                 << ", remote_path=" << remote_file_url
+                                 << ", file_size=" << file_size
+                                 << ", local_file_size=" << local_file_size;
+                    return Status::InternalError("downloaded file size is not equal");
+                }
+                chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
+                return Status::OK();
+            };
+            RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb));
+
+            // local_files always keep the updated local files
+            local_files[filename] = LocalFileStat {file_size};
+        }
+
+        uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+        total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+        double copy_rate = 0.0;
+        if (total_time_ms > 0) {
+            copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+        }
+        LOG(INFO) << fmt::format(
+                "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
+                "{} ms, rate: {} MB/s",
+                remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate);
+
+        // local_files: contain all remote files and local files
+        // finally, delete local files which are not in remote
+        for (const auto& [local_file, local_filestat] : local_files) {
+            // replace the tablet id in local file name with the remote tablet id,
+            // in order to compare the file name.
+            std::string new_name;
+            Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
+            if (!st.ok()) {
+                LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
+                             << ". ignore it";
+                continue;
+            }
+            VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
+            const auto& find = remote_files.find(new_name);
+            if (find != remote_files.end()) {
+                continue;
+            }
+
+            // delete
+            std::string full_local_file = local_path + "/" + local_file;
+            LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
+                      << ", it does not exist in remote";
+            if (remove(full_local_file.c_str()) != 0) {
+                LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
+                             << ", error: " << strerror(errno)
+                             << ", file size: " << local_filestat.size << ", ignore it";
+            }
+        }
+
+        ++finished_num;
+    }
+
+    LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
+    return status;
+}
+
 // move the snapshot files in snapshot_path
 // to tablet_path
 // If overwrite, just replace the tablet_path with snapshot_path,
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 9e7a22d7a3..c0d1f0f708 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -33,6 +33,8 @@ namespace io {
 class RemoteFileSystem;
 } // namespace io
 
+class TRemoteTabletSnapshot;
+
 struct FileStat {
     std::string name;
     std::string md5;
@@ -77,6 +79,9 @@ public:
     Status download(const std::map<std::string, std::string>& src_to_dest_path,
                     std::vector<int64_t>* downloaded_tablet_ids);
 
+    Status remote_http_download(const std::vector<TRemoteTabletSnapshot>& remote_tablets,
+                                std::vector<int64_t>* downloaded_tablet_ids);
+
     Status move(const std::string& snapshot_path, TabletSharedPtr tablet, bool overwrite);
 
 private:
diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp
index a5e528b45f..1b4a1ec944 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -396,49 +396,49 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
     }
 
     /// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id
-    if (request.__isset.txn_id) {
+    if (!request.__isset.txn_id) {
         LOG(WARNING) << "txn_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("txn_id is empty");
         return;
     }
-    if (request.__isset.remote_tablet_id) {
+    if (!request.__isset.remote_tablet_id) {
         LOG(WARNING) << "remote_tablet_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_tablet_id is empty");
         return;
     }
-    if (request.__isset.binlog_version) {
+    if (!request.__isset.binlog_version) {
         LOG(WARNING) << "binlog_version is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("binlog_version is empty");
         return;
     }
-    if (request.__isset.remote_host) {
+    if (!request.__isset.remote_host) {
         LOG(WARNING) << "remote_host is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_host is empty");
         return;
     }
-    if (request.__isset.remote_port) {
+    if (!request.__isset.remote_port) {
         LOG(WARNING) << "remote_port is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("remote_port is empty");
         return;
     }
-    if (request.__isset.partition_id) {
+    if (!request.__isset.partition_id) {
         LOG(WARNING) << "partition_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
         tstatus.error_msgs.emplace_back("partition_id is empty");
         return;
     }
-    if (request.__isset.load_id) {
+    if (!request.__isset.load_id) {
         LOG(WARNING) << "load_id is empty";
         tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
         tstatus.__isset.error_msgs = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 679ebd8cb8..2382093d6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.backup.Repository;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
@@ -45,12 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt {
     private int metaVersion = -1;
     private boolean reserveReplica = false;
     private boolean reserveDynamicPartitionEnable = false;
+    private boolean isLocal = false;
+    private byte[] meta = null;
+    private byte[] jobInfo = null;
 
     public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
                        Map<String, String> properties) {
         super(labelName, repoName, restoreTableRefClause, properties);
     }
 
+    public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
+                       Map<String, String> properties, byte[] meta, byte[] jobInfo) {
+        super(labelName, repoName, restoreTableRefClause, properties);
+        this.meta = meta;
+        this.jobInfo = jobInfo;
+    }
+
     public boolean allowLoad() {
         return allowLoad;
     }
@@ -75,8 +86,23 @@ public class RestoreStmt extends AbstractBackupStmt {
         return reserveDynamicPartitionEnable;
     }
 
+    public boolean isLocal() {
+        return isLocal;
+    }
+
+    public byte[] getMeta() {
+        return meta;
+    }
+
+    public byte[] getJobInfo() {
+        return jobInfo;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
+        if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+            isLocal = true;
+        }
         super.analyze(analyzer);
     }
 
@@ -148,8 +174,10 @@ public class RestoreStmt extends AbstractBackupStmt {
             backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP);
             copiedProperties.remove(PROP_BACKUP_TIMESTAMP);
         } else {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
-                    "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+            if (!isLocal) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+                        "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+            }
         }
 
         // meta version
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
index bdf33ddfec..d10d216b12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
@@ -28,6 +28,11 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 
 public class ShowSnapshotStmt extends ShowStmt {
+    public enum SnapshotType {
+        REMOTE,
+        LOCAL
+    }
+
     public static final ImmutableList<String> SNAPSHOT_ALL = new ImmutableList.Builder<String>()
             .add("Snapshot").add("Timestamp").add("Status")
             .build();
@@ -39,6 +44,7 @@ public class ShowSnapshotStmt extends ShowStmt {
     private Expr where;
     private String snapshotName;
     private String timestamp;
+    private SnapshotType snapshotType = SnapshotType.REMOTE;
 
     public ShowSnapshotStmt(String repoName, Expr where) {
         this.repoName = repoName;
@@ -87,7 +93,7 @@ public class ShowSnapshotStmt extends ShowStmt {
 
             if (!ok) {
                 throw new AnalysisException("Where clause should looks like: SNAPSHOT = 'your_snapshot_name'"
-                        + " [AND TIMESTAMP = '2018-04-18-19-19-10']");
+                        + " [AND TIMESTAMP = '2018-04-18-19-19-10'] [AND SNAPSHOTTYPE = 'remote' | 'local']");
             }
         }
     }
@@ -116,10 +122,25 @@ public class ShowSnapshotStmt extends ShowStmt {
                 return false;
             }
             return true;
+        } else if (name.equalsIgnoreCase("snapshotType")) {
+            String snapshotTypeVal = ((StringLiteral) val).getStringValue();
+            if (Strings.isNullOrEmpty(snapshotTypeVal)) {
+                return false;
+            }
+            // snapshotType now only support "remote" and "local"
+            switch (snapshotTypeVal.toLowerCase()) {
+                case "remote":
+                    snapshotType = SnapshotType.REMOTE;
+                    return true;
+                case "local":
+                    snapshotType = SnapshotType.LOCAL;
+                    return true;
+                default:
+                    return false;
+            }
+        } else {
+            return false;
         }
-
-        return false;
-
     }
 
     public String getRepoName() {
@@ -134,6 +155,10 @@ public class ShowSnapshotStmt extends ShowStmt {
         return timestamp;
     }
 
+    public String getSnapshotType() {
+        return snapshotType.name();
+    }
+
     @Override
     public ShowResultSetMetaData getMetaData() {
         ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d149ad574d..6619c457b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.task.DirMoveTask;
@@ -56,13 +57,16 @@ import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
@@ -75,7 +79,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -104,6 +110,12 @@ public class BackupHandler extends MasterDaemon implements Writable {
 
     private Env env;
 
+    // map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
+    // this map not present in persist && only in fe master memory
+    // one table only keep one snapshot info, only keep last
+    private final Map<String, Snapshot> localSnapshots = new HashMap<>();
+    private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
+
     public BackupHandler() {
         // for persist
     }
@@ -241,9 +253,13 @@ public class BackupHandler extends MasterDaemon implements Writable {
     public void process(AbstractBackupStmt stmt) throws DdlException {
         // check if repo exist
         String repoName = stmt.getRepoName();
-        Repository repository = repoMgr.getRepo(repoName);
-        if (repository == null) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist");
+        Repository repository = null;
+        if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+            repository = repoMgr.getRepo(repoName);
+            if (repository == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Repository " + repoName + " does not exist");
+            }
         }
 
         // check if db exist
@@ -286,7 +302,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
     }
 
     private void backup(Repository repository, Database db, BackupStmt stmt) throws DdlException {
-        if (repository.isReadOnly()) {
+        if (repository != null && repository.isReadOnly()) {
             ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repository.getName()
                     + " is read only");
         }
@@ -357,25 +373,29 @@ public class BackupHandler extends MasterDaemon implements Writable {
         }
 
         // Check if label already be used
-        List<String> existSnapshotNames = Lists.newArrayList();
-        Status st = repository.listSnapshots(existSnapshotNames);
-        if (!st.ok()) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
-        }
-        if (existSnapshotNames.contains(stmt.getLabel())) {
-            if (stmt.getType() == BackupType.FULL) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
-                        + stmt.getLabel() + "' already exist in repository");
-            } else {
-                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
-                        + "incremental backup");
+        long repoId = -1;
+        if (repository != null) {
+            List<String> existSnapshotNames = Lists.newArrayList();
+            Status st = repository.listSnapshots(existSnapshotNames);
+            if (!st.ok()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
+            }
+            if (existSnapshotNames.contains(stmt.getLabel())) {
+                if (stmt.getType() == BackupType.FULL) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
+                            + stmt.getLabel() + "' already exist in repository");
+                } else {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
+                            + "incremental backup");
+                }
             }
+            repoId = repository.getId();
         }
 
         // Create a backup job
         BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
                 ClusterNamespace.getNameFromFullName(db.getFullName()),
-                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repository.getId());
+                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
         // write log
         env.getEditLog().logBackupJob(backupJob);
 
@@ -386,26 +406,62 @@ public class BackupHandler extends MasterDaemon implements Writable {
     }
 
     private void restore(Repository repository, Database db, RestoreStmt stmt) throws DdlException {
-        // Check if snapshot exist in repository
-        List<BackupJobInfo> infos = Lists.newArrayList();
-        Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
-        if (!status.ok()) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                           "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
-                                                   + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+        BackupJobInfo jobInfo;
+        if (stmt.isLocal()) {
+            String jobInfoString = new String(stmt.getJobInfo());
+            jobInfo = BackupJobInfo.genFromJson(jobInfoString);
+
+            if (jobInfo.extraInfo == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty");
+            }
+            if (jobInfo.extraInfo.beNetworkMap == null) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info be network map");
+            }
+            if (Strings.isNullOrEmpty(jobInfo.extraInfo.token)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info token");
+            }
+        } else {
+            // Check if snapshot exist in repository
+            List<BackupJobInfo> infos = Lists.newArrayList();
+            Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
+            if (!status.ok()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
+                                + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+            }
+
+            // Check if all restore objects are exist in this snapshot.
+            // Also remove all unrelated objs
+            Preconditions.checkState(infos.size() == 1);
+            jobInfo = infos.get(0);
         }
 
-        // Check if all restore objects are exist in this snapshot.
-        // Also remove all unrelated objs
-        Preconditions.checkState(infos.size() == 1);
-        BackupJobInfo jobInfo = infos.get(0);
         checkAndFilterRestoreObjsExistInSnapshot(jobInfo, stmt.getAbstractBackupTableRefClause());
 
         // Create a restore job
-        RestoreJob restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
+        RestoreJob restoreJob;
+        if (stmt.isLocal()) {
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(stmt.getMeta());
+            DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
+            try {
+                BackupMeta backupMeta = BackupMeta.read(dataInputStream);
+                String backupTimestamp =
+                        TimeUtils.longToTimeString(jobInfo.getBackupTime(), TimeUtils.DATETIME_FORMAT_WITH_HYPHEN);
+                restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
+                        db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
+                        stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(),
+                        stmt.reserveDynamicPartitionEnable(),
+                        env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+            } catch (IOException e) {
+                throw new DdlException(e.getMessage());
+            }
+        } else {
+            restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
                 db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
                 stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
                 env, repository.getId());
+        }
+
         env.getEditLog().logRestoreJob(restoreJob);
 
         // must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
@@ -667,6 +723,24 @@ public class BackupHandler extends MasterDaemon implements Writable {
         return false;
     }
 
+    public void addSnapshot(String labelName, Snapshot snapshot) {
+        localSnapshotsLock.writeLock().lock();
+        try {
+            localSnapshots.put(labelName, snapshot);
+        } finally {
+            localSnapshotsLock.writeLock().unlock();
+        }
+    }
+
+    public Snapshot getSnapshot(String labelName) {
+        localSnapshotsLock.readLock().lock();
+        try {
+            return localSnapshots.get(labelName);
+        } finally {
+            localSnapshotsLock.readLock().unlock();
+        }
+    }
+
     public static BackupHandler read(DataInput in) throws IOException {
         BackupHandler backupHandler = new BackupHandler();
         backupHandler.readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 2300cd1e01..60d486c310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -114,6 +114,9 @@ public class BackupJob extends AbstractJob {
     // backup properties
     private Map<String, String> properties = Maps.newHashMap();
 
+    private byte[] metaInfoBytes = null;
+    private byte[] jobInfoBytes = null;
+
     public BackupJob() {
         super(JobType.BACKUP);
     }
@@ -282,7 +285,7 @@ public class BackupJob extends AbstractJob {
         }
 
         // get repo if not set
-        if (repo == null) {
+        if (repo == null && repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
             repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
             if (repo == null) {
                 status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -565,6 +568,11 @@ public class BackupJob extends AbstractJob {
     }
 
     private void uploadSnapshot() {
+        if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+            state = BackupJobState.UPLOADING;
+            return;
+        }
+
         // reuse this set to save all unfinished tablets
         unfinishedTaskIds.clear();
         taskProgress.clear();
@@ -673,6 +681,8 @@ public class BackupJob extends AbstractJob {
             }
             backupMeta.writeToFile(metaInfoFile);
             localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
+            // read meta info to metaInfoBytes
+            metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
 
             // 3. save job info file
             jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
@@ -685,6 +695,8 @@ public class BackupJob extends AbstractJob {
             }
             jobInfo.writeToFile(jobInfoFile);
             localJobInfoFilePath = jobInfoFile.getAbsolutePath();
+            // read job info to jobInfoBytes
+            jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
         } catch (Exception e) {
             status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
             return;
@@ -697,7 +709,9 @@ public class BackupJob extends AbstractJob {
         jobInfo = null;
 
         // release all snapshots before clearing the snapshotInfos.
-        releaseSnapshots();
+        if (repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
+            releaseSnapshots();
+        }
 
         snapshotInfos.clear();
 
@@ -724,6 +738,13 @@ public class BackupJob extends AbstractJob {
     }
 
     private void uploadMetaAndJobInfoFile() {
+        if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+            state = BackupJobState.FINISHED;
+            Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
+            env.getBackupHandler().addSnapshot(label, snapshot);
+            return;
+        }
+
         String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label);
         if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) {
             return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index ec622dbdca..4457740440 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
@@ -92,10 +93,39 @@ public class BackupJobInfo implements Writable {
     @SerializedName("meta_version")
     public int metaVersion;
 
+    @SerializedName("tablet_be_map")
+    public Map<Long, Long> tabletBeMap = Maps.newHashMap();
+
+    @SerializedName("tablet_snapshot_path_map")
+    public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
+
+    public static class ExtraInfo {
+        public static class NetworkAddrss {
+            @SerializedName("ip")
+            public String ip;
+            @SerializedName("port")
+            public int port;
+        }
+
+        @SerializedName("be_network_map")
+        public Map<Long, NetworkAddrss> beNetworkMap = Maps.newHashMap();
+
+        @SerializedName("token")
+        public String token;
+    }
+
+    @SerializedName("extra_info")
+    public ExtraInfo extraInfo;
+
+
     // This map is used to save the table alias mapping info when processing a restore job.
     // origin -> alias
     public Map<String, String> tblAlias = Maps.newHashMap();
 
+    public long getBackupTime() {
+        return backupTime;
+    }
+
     public void initBackupJobInfoAfterDeserialize() {
         // transform success
         if (successJson.equals("succeed")) {
@@ -487,6 +517,62 @@ public class BackupJobInfo implements Writable {
         return Joiner.on("/").join(pathSeg);
     }
 
+    // struct TRemoteTabletSnapshot {
+    //     1: optional i64 local_tablet_id
+    //     2: optional string local_snapshot_path
+    //     3: optional i64 remote_tablet_id
+    //     4: optional i64 remote_be_id
+    //     5: optional Types.TSchemaHash schema_hash
+    //     6: optional Types.TNetworkAddress remote_be_addr
+    //     7: optional string remote_snapshot_path
+    //     8: optional string token
+    // }
+
+    public String getTabletSnapshotPath(Long tabletId) {
+        return tabletSnapshotPathMap.get(tabletId);
+    }
+
+    public Long getBeId(Long tabletId) {
+        return tabletBeMap.get(tabletId);
+    }
+
+    public String getToken() {
+        return extraInfo.token;
+    }
+
+    public TNetworkAddress getBeAddr(Long beId) {
+        ExtraInfo.NetworkAddrss addr = extraInfo.beNetworkMap.get(beId);
+        if (addr == null) {
+            return null;
+        }
+
+        return new TNetworkAddress(addr.ip, addr.port);
+    }
+
+    // TODO(Drogon): improve this find perfermance
+    public Long getSchemaHash(long tableId, long partitionId, long indexId) {
+        for (BackupOlapTableInfo backupOlapTableInfo : backupOlapTableObjects.values()) {
+            if (backupOlapTableInfo.id != tableId) {
+                continue;
+            }
+
+            for (BackupPartitionInfo backupPartitionInfo : backupOlapTableInfo.partitions.values()) {
+                if (backupPartitionInfo.id != partitionId) {
+                    continue;
+                }
+
+                for (BackupIndexInfo backupIndexInfo : backupPartitionInfo.indexes.values()) {
+                    if (backupIndexInfo.id != indexId) {
+                        continue;
+                    }
+
+                    return Long.valueOf(backupIndexInfo.schemaHash);
+                }
+            }
+        }
+        return null;
+    }
+
     public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
                                             BackupContent content, BackupMeta backupMeta,
                                             Map<Long, SnapshotInfo> snapshotInfos) {
@@ -526,8 +612,11 @@ public class BackupJobInfo implements Writable {
                             }
                         } else {
                             for (Tablet tablet : index.getTablets()) {
+                                SnapshotInfo snapshotInfo = snapshotInfos.get(tablet.getId());
                                 idxInfo.tablets.put(tablet.getId(),
-                                        Lists.newArrayList(snapshotInfos.get(tablet.getId()).getFiles()));
+                                        Lists.newArrayList(snapshotInfo.getFiles()));
+                                jobInfo.tabletBeMap.put(tablet.getId(), snapshotInfo.getBeId());
+                                jobInfo.tabletSnapshotPathMap.put(tablet.getId(), snapshotInfo.getPath());
                             }
                         }
                         idxInfo.tabletsOrder.addAll(index.getTabletIdsInOrder());
@@ -578,7 +667,7 @@ public class BackupJobInfo implements Writable {
         return genFromJson(json);
     }
 
-    private static BackupJobInfo genFromJson(String json) {
+    public static BackupJobInfo genFromJson(String json) {
         /* parse the json string:
          * {
          *   "backup_time": 1522231864000,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index e059d55be6..e22bb7f33c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -21,8 +21,10 @@ import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.meta.MetaContext;
+import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -38,10 +40,13 @@ import java.util.Map;
 public class BackupMeta implements Writable {
 
     // tbl name -> tbl
+    @SerializedName(value = "tblNameMap")
     private Map<String, Table> tblNameMap = Maps.newHashMap();
     // tbl id -> tbl
+    @SerializedName(value = "tblIdMap")
     private Map<Long, Table> tblIdMap = Maps.newHashMap();
     // resource name -> resource
+    @SerializedName(value = "resourceNameMap")
     private Map<String, Resource> resourceNameMap = Maps.newHashMap();
 
     private BackupMeta() {
@@ -136,4 +141,13 @@ public class BackupMeta implements Writable {
             resourceNameMap.put(resource.getName(), resource);
         }
     }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 6d421332dc..ba95a77352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -97,6 +97,8 @@ public class Repository implements Writable {
     public static final String FILE_REPO_INFO = "__repo_info";
     public static final String FILE_META_INFO = "__meta";
     public static final String DIR_SNAPSHOT_CONTENT = "__ss_content";
+    public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__";
+    public static final long KEEP_ON_LOCAL_REPO_ID = -1;
     private static final Logger LOG = LogManager.getLogger(Repository.class);
     private static final String PATH_DELIMITER = "/";
     private static final String CHECKSUM_SEPARATOR = ".";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 60ff99c4b9..e9e2eee824 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -71,6 +71,8 @@ import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.ReleaseSnapshotTask;
 import org.apache.doris.task.SnapshotTask;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -185,6 +187,18 @@ public class RestoreJob extends AbstractJob {
         properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
     }
 
+    public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
+            ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
+            boolean reserveDynamicPartitionEnable, Env env, long repoId, BackupMeta backupMeta) {
+        this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
+                reserveDynamicPartitionEnable, env, repoId);
+        this.backupMeta = backupMeta;
+    }
+
+    public boolean isFromLocalSnapshot() {
+        return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+    }
+
     public RestoreJobState getState() {
         return state;
     }
@@ -324,7 +338,7 @@ public class RestoreJob extends AbstractJob {
         }
 
         // get repo if not set
-        if (repo == null) {
+        if (repo == null && !isFromLocalSnapshot()) {
             repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
             if (repo == null) {
                 status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -1109,6 +1123,15 @@ public class RestoreJob extends AbstractJob {
     }
 
     private boolean downloadAndDeserializeMetaInfo() {
+        if (isFromLocalSnapshot()) {
+            if (backupMeta != null) {
+                return true;
+            }
+
+            status = new Status(ErrCode.COMMON_ERROR, "backupMeta is null");
+            return false;
+        }
+
         List<BackupMeta> backupMetas = Lists.newArrayList();
         Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas,
                 this.metaVersion == -1 ? jobInfo.metaVersion : this.metaVersion);
@@ -1251,7 +1274,15 @@ public class RestoreJob extends AbstractJob {
     }
 
     private void downloadSnapshots() {
-        // Categorize snapshot infos by db id.
+        if (isFromLocalSnapshot()) {
+            downloadLocalSnapshots();
+        } else {
+            downloadRemoteSnapshots();
+        }
+    }
+
+    private void downloadRemoteSnapshots() {
+        // Categorize snapshot onfos by db id.
         ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
         for (SnapshotInfo info : snapshotInfos.values()) {
             dbToSnapshotInfos.put(info.getDbId(), info);
@@ -1289,7 +1320,8 @@ public class RestoreJob extends AbstractJob {
                     LOG.debug("backend {} has {} batch, total {} tasks, {}",
                               beId, batchNum, totalNum, this);
 
-                    List<FsBroker> brokerAddrs = Lists.newArrayList();
+                    List<FsBroker> brokerAddrs = null;
+                    brokerAddrs = Lists.newArrayList();
                     Status st = repo.getBrokerAddress(beId, env, brokerAddrs);
                     if (!st.ok()) {
                         status = st;
@@ -1401,6 +1433,174 @@ public class RestoreJob extends AbstractJob {
         LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
     }
 
+    private void downloadLocalSnapshots() {
+        // Categorize snapshot infos by db id.
+        ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
+        for (SnapshotInfo info : snapshotInfos.values()) {
+            dbToSnapshotInfos.put(info.getDbId(), info);
+        }
+
+        // Send download tasks
+        unfinishedSignatureToId.clear();
+        taskProgress.clear();
+        taskErrMsg.clear();
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (long dbId : dbToSnapshotInfos.keySet()) {
+            List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
+
+            Database db = env.getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                status = new Status(ErrCode.NOT_FOUND, "db " + dbId + " does not exist");
+                return;
+            }
+
+            // We classify the snapshot info by backend
+            ArrayListMultimap<Long, SnapshotInfo> beToSnapshots = ArrayListMultimap.create();
+            for (SnapshotInfo info : infos) {
+                beToSnapshots.put(info.getBeId(), info);
+            }
+
+            db.readLock();
+            try {
+                for (Long beId : beToSnapshots.keySet()) {
+                    List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
+                    int totalNum = beSnapshotInfos.size();
+                    // each backend allot at most 3 tasks
+                    int batchNum = Math.min(totalNum, 3);
+                    // each task contains several upload sub tasks
+                    int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
+
+                    // allot tasks
+                    int index = 0;
+                    for (int batch = 0; batch < batchNum; batch++) {
+                        List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList();
+                        int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
+                        for (int j = 0; j < currentBatchTaskNum; j++) {
+                            TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot();
+
+                            SnapshotInfo info = beSnapshotInfos.get(index++);
+                            Table tbl = db.getTableNullable(info.getTblId());
+                            if (tbl == null) {
+                                status = new Status(ErrCode.NOT_FOUND, "restored table "
+                                        + info.getTabletId() + " does not exist");
+                                return;
+                            }
+                            OlapTable olapTbl = (OlapTable) tbl;
+                            olapTbl.readLock();
+                            try {
+                                Partition part = olapTbl.getPartition(info.getPartitionId());
+                                if (part == null) {
+                                    status = new Status(ErrCode.NOT_FOUND, "partition "
+                                            + info.getPartitionId() + " does not exist in restored table: "
+                                            + tbl.getName());
+                                    return;
+                                }
+
+                                MaterializedIndex idx = part.getIndex(info.getIndexId());
+                                if (idx == null) {
+                                    status = new Status(ErrCode.NOT_FOUND, "index " + info.getIndexId()
+                                            + " does not exist in partion " + part.getName()
+                                            + "of restored table " + tbl.getName());
+                                    return;
+                                }
+
+                                Tablet tablet  = idx.getTablet(info.getTabletId());
+                                if (tablet == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "tablet " + info.getTabletId() + " does not exist in restored table "
+                                                    + tbl.getName());
+                                    return;
+                                }
+
+                                Replica replica = tablet.getReplicaByBackendId(info.getBeId());
+                                if (replica == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "replica in be " + info.getBeId() + " of tablet "
+                                                    + tablet.getId() + " does not exist in restored table "
+                                                    + tbl.getName());
+                                    return;
+                                }
+
+                                IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(),
+                                        info.getTabletId(), replica.getId());
+                                IdChain repoIds = fileMapping.get(catalogIds);
+                                if (repoIds == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get id mapping of catalog ids: " + catalogIds.toString());
+                                    return;
+                                }
+
+                                SnapshotInfo snapshotInfo = snapshotInfos.get(info.getTabletId(), info.getBeId());
+                                Preconditions.checkNotNull(snapshotInfo, info.getTabletId() + "-" + info.getBeId());
+                                // download to previous exist snapshot dir
+                                String dest = snapshotInfo.getTabletPath();
+
+                                Long localTabletId = info.getTabletId();
+                                String localSnapshotPath = dest;
+                                Long remoteTabletId = repoIds.getTabletId();
+                                Long remoteBeId = jobInfo.getBeId(remoteTabletId);
+                                String remoteSnapshotPath = jobInfo.getTabletSnapshotPath(remoteTabletId);
+                                if (remoteSnapshotPath == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get remote snapshot path of tablet: " + remoteTabletId);
+                                    return;
+                                }
+                                Long schemaHash = jobInfo.getSchemaHash(
+                                        repoIds.getTblId(), repoIds.getPartId(), repoIds.getIdxId());
+                                if (schemaHash == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get schema hash of table: " + repoIds.getTblId()
+                                                    + ", partition: " + repoIds.getPartId()
+                                                    + ", index: " + repoIds.getIdxId());
+                                    return;
+                                }
+                                // remoteSnapshotPath = "${remoteSnapshotPath}/${remoteTabletId}/${schemaHash}"
+                                remoteSnapshotPath =
+                                        String.format("%s/%d/%d", remoteSnapshotPath, remoteTabletId, schemaHash);
+                                TNetworkAddress remoteBeAddr = jobInfo.getBeAddr(remoteBeId);
+                                if (remoteBeAddr == null) {
+                                    status = new Status(ErrCode.NOT_FOUND,
+                                            "failed to get remote be address of be: " + remoteBeId);
+                                    return;
+                                }
+                                String remoteToken = jobInfo.getToken();
+
+                                remoteTabletSnapshot.setLocalTabletId(localTabletId);
+                                remoteTabletSnapshot.setLocalSnapshotPath(localSnapshotPath);
+                                remoteTabletSnapshot.setRemoteTabletId(remoteTabletId);
+                                remoteTabletSnapshot.setRemoteBeId(remoteBeId);
+                                remoteTabletSnapshot.setRemoteBeAddr(remoteBeAddr);
+                                remoteTabletSnapshot.setRemoteSnapshotPath(remoteSnapshotPath);
+                                remoteTabletSnapshot.setRemoteToken(remoteToken);
+
+                                remoteTabletSnapshots.add(remoteTabletSnapshot);
+                            } finally {
+                                olapTbl.readUnlock();
+                            }
+                        }
+                        long signature = env.getNextId();
+                        DownloadTask task = new DownloadTask(null, beId, signature, jobId, dbId, remoteTabletSnapshots);
+                        batchTask.addTask(task);
+                        unfinishedSignatureToId.put(signature, beId);
+                    }
+                }
+            } finally {
+                db.readUnlock();
+            }
+        }
+
+        // send task
+        for (AgentTask task : batchTask.getAllTasks()) {
+            AgentTaskQueue.addTask(task);
+        }
+        AgentTaskExecutor.submit(batchTask);
+
+        state = RestoreJobState.DOWNLOADING;
+
+        // No edit log here
+        LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
+    }
+
     private void waitingAllDownloadFinished() {
         if (unfinishedSignatureToId.isEmpty()) {
             downloadFinishedTime = System.currentTimeMillis();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
new file mode 100644
index 0000000000..b26cb2e1e7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.backup;
+
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+public class Snapshot {
+    @SerializedName(value = "label")
+    private String label = null;
+
+    @SerializedName(value = "meta")
+    private byte[] meta = null;
+
+    @SerializedName(value = "jobInfo")
+    private byte[] jobInfo = null;
+
+    @SerializedName(value = "createTime")
+    private String createTime = null;
+
+    public Snapshot() {
+    }
+
+    public Snapshot(String label, byte[] meta, byte[] jobInfo) {
+        this.label = label;
+        this.meta = meta;
+        this.jobInfo = jobInfo;
+    }
+
+
+    public byte[] getMeta() {
+        return meta;
+    }
+
+    public byte[] getJobInfo() {
+        return jobInfo;
+    }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        // return toJson();
+        return "Snapshot{"
+                + "label='" + label + '\''
+                + ", meta=" + meta
+                + ", jobInfo=" + jobInfo
+                + ", createTime='" + createTime + '\''
+                + '}';
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b650100823..2dbb007498 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -19,11 +19,15 @@ package org.apache.doris.service;
 
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.analysis.AddColumnsClause;
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.Snapshot;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
@@ -63,6 +67,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
+import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.VariableMgr;
@@ -106,6 +111,8 @@ import org.apache.doris.thrift.TGetBinlogResult;
 import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetQueryStatsRequest;
+import org.apache.doris.thrift.TGetSnapshotRequest;
+import org.apache.doris.thrift.TGetSnapshotResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
 import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
@@ -137,11 +144,14 @@ import org.apache.doris.thrift.TReplicaInfo;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TReportRequest;
+import org.apache.doris.thrift.TRestoreSnapshotRequest;
+import org.apache.doris.thrift.TRestoreSnapshotResult;
 import org.apache.doris.thrift.TRollbackTxnRequest;
 import org.apache.doris.thrift.TRollbackTxnResult;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
 import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
+import org.apache.doris.thrift.TSnapshotType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -2124,15 +2134,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
             throw new UserException("prev_commit_seq is not set");
         }
 
+
+        // step 1: check auth
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
             cluster = SystemInfoService.DEFAULT_CLUSTER;
         }
-
-        // step 1: check auth
         if (Strings.isNullOrEmpty(request.getToken())) {
             checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(),
-                    request.getUserIp(), PrivPredicate.LOAD);
+                    request.getUserIp(), PrivPredicate.SELECT);
         }
 
         // step 3: check database
@@ -2181,4 +2191,182 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
         return result;
     }
+
+    // getSnapshot
+    public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.trace("receive get snapshot info request: {}", request);
+
+        TGetSnapshotResult result = new TGetSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            result = getSnapshotImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    // getSnapshotImpl
+    private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp)
+            throws UserException {
+        // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetLabelName()) {
+            throw new UserException("label_name is not set");
+        }
+        if (!request.isSetSnapshotName()) {
+            throw new UserException("snapshot_name is not set");
+        }
+        if (!request.isSetSnapshotType()) {
+            throw new UserException("snapshot_type is not set");
+        } else if (request.getSnapshotType() != TSnapshotType.LOCAL) {
+            throw new UserException("snapshot_type is not LOCAL");
+        }
+
+        // Step 2: check auth
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        LOG.info("get snapshot info, user: {}, db: {}, label_name: {}, snapshot_name: {}, snapshot_type: {}",
+                request.getUser(), request.getDb(), request.getLabelName(), request.getSnapshotName(),
+                request.getSnapshotType());
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                    request.getTable(), clientIp, PrivPredicate.LOAD);
+        }
+
+        // Step 3: get snapshot
+        TGetSnapshotResult result = new TGetSnapshotResult();
+        result.setStatus(new TStatus(TStatusCode.OK));
+        Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+        if (snapshot == null) {
+            result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
+            result.getStatus().addToErrorMsgs("snapshot not exist");
+        } else {
+            result.setMeta(snapshot.getMeta());
+            result.setJobInfo(snapshot.getJobInfo());
+        }
+
+        return result;
+    }
+
+    // Restore Snapshot
+    public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.trace("receive restore snapshot info request: {}", request);
+
+        TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            result = restoreSnapshotImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    // restoreSnapshotImpl
+    private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp)
+            throws UserException {
+        // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetLabelName()) {
+            throw new UserException("label_name is not set");
+        }
+        if (!request.isSetRepoName()) {
+            throw new UserException("repo_name is not set");
+        }
+        if (!request.isSetMeta()) {
+            throw new UserException("meta is not set");
+        }
+        if (!request.isSetJobInfo()) {
+            throw new UserException("job_info is not set");
+        }
+
+        // Step 2: check auth
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                    request.getTable(), clientIp, PrivPredicate.LOAD);
+        }
+
+        // Step 3: get snapshot
+        TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+
+        LabelName label = new LabelName(request.getDb(), request.getLabelName());
+        String repoName = request.getRepoName();
+        Map<String, String> properties = request.getProperties();
+        RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(),
+                request.getJobInfo());
+        LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
+        try {
+            ConnectContext ctx = ConnectContext.get();
+            if (ctx == null) {
+                ctx = new ConnectContext();
+                ctx.setThreadLocalInfo();
+            }
+            ctx.setCluster(cluster);
+            ctx.setQualifiedUser(request.getUser());
+            UserIdentity currentUserIdentity = new UserIdentity(request.getUser(), "%");
+            ctx.setCurrentUserIdentity(currentUserIdentity);
+
+            Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
+            restoreStmt.analyze(analyzer);
+            DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
+        } catch (UserException e) {
+            LOG.warn("failed to get snapshot info: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+        }
+
+        return result;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
index 64b75a70d3..6482c5f807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.thrift.TDownloadReq;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
 import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TTaskType;
 
+import java.util.List;
 import java.util.Map;
 
 public class DownloadTask extends AgentTask {
@@ -34,6 +36,9 @@ public class DownloadTask extends AgentTask {
     private Map<String, String> brokerProperties;
     private StorageBackend.StorageType storageType;
     private String location;
+    private List<TRemoteTabletSnapshot> remoteTabletSnapshots;
+    private boolean isFromLocalSnapshot = false;
+
 
     public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
             Map<String, String> srcToDestPath, FsBroker brokerAddr, Map<String, String> brokerProperties,
@@ -45,6 +50,16 @@ public class DownloadTask extends AgentTask {
         this.brokerProperties = brokerProperties;
         this.storageType = storageType;
         this.location = location;
+        this.isFromLocalSnapshot = false;
+    }
+
+    public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
+                        List<TRemoteTabletSnapshot> remoteTabletSnapshots) {
+        super(resourceInfo, backendId, TTaskType.DOWNLOAD, dbId, -1, -1, -1, -1, signature);
+        this.jobId = jobId;
+        this.srcToDestPath = new java.util.HashMap<String, String>();
+        this.remoteTabletSnapshots = remoteTabletSnapshots;
+        this.isFromLocalSnapshot = true;
     }
 
     public long getJobId() {
@@ -64,11 +79,22 @@ public class DownloadTask extends AgentTask {
     }
 
     public TDownloadReq toThrift() {
-        TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
-        TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address);
-        req.setBrokerProp(brokerProperties);
-        req.setStorageBackend(storageType.toThrift());
-        req.setLocation(location);
+        // these fields are required
+        // 1: required i64 job_id
+        // 2: required map<string, string> src_dest_map
+        // 3: required Types.TNetworkAddress broker_addr
+        TDownloadReq req;
+        if (isFromLocalSnapshot) {
+            TNetworkAddress brokerAddr = new TNetworkAddress("", 0); // mock broker address
+            req = new TDownloadReq(jobId, srcToDestPath, brokerAddr);
+            req.setRemoteTabletSnapshots(remoteTabletSnapshots);
+        } else {
+            TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
+            req = new TDownloadReq(jobId, srcToDestPath, address);
+            req.setBrokerProp(brokerProperties);
+            req.setStorageBackend(storageType.toThrift());
+            req.setLocation(location);
+        }
         return req;
     }
 }
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 6b74944cd7..b30c7ed26a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -288,6 +288,16 @@ struct TUploadReq {
     6: optional string location // root path
 }
 
+struct TRemoteTabletSnapshot {
+    1: optional i64 local_tablet_id
+    2: optional string local_snapshot_path
+    3: optional i64 remote_tablet_id
+    4: optional i64 remote_be_id
+    5: optional Types.TNetworkAddress remote_be_addr
+    6: optional string remote_snapshot_path
+    7: optional string remote_token
+}
+
 struct TDownloadReq {
     1: required i64 job_id
     2: required map<string, string> src_dest_map
@@ -295,6 +305,7 @@ struct TDownloadReq {
     4: optional map<string, string> broker_prop
     5: optional Types.TStorageBackendType storage_backend = Types.TStorageBackendType.BROKER
     6: optional string location // root path
+    7: optional list<TRemoteTabletSnapshot> remote_tablet_snapshots
 }
 
 struct TSnapshotRequest {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 5aecdc4ba2..433b137a37 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -974,6 +974,52 @@ struct TGetTabletReplicaInfosResult {
     3: optional string token
 }
 
+enum TSnapshotType {
+    REMOTE = 0,
+    LOCAL  = 1,
+}
+
+struct TGetSnapshotRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string passwd
+    4: optional string db
+    5: optional string table
+    6: optional string token
+    7: optional string label_name
+    8: optional string snapshot_name
+    9: optional TSnapshotType snapshot_type
+}
+
+struct TGetSnapshotResult {
+    1: optional Status.TStatus status
+    2: optional binary meta
+    3: optional binary job_info
+}
+
+struct TTableRef {
+    1: optional string table
+}
+
+struct TRestoreSnapshotRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string passwd
+    4: optional string db
+    5: optional string table
+    6: optional string token
+    7: optional string label_name
+    8: optional string repo_name
+    9: optional list<TTableRef> table_refs
+    10: optional map<string, string> properties
+    11: optional binary meta
+    12: optional binary job_info
+}
+
+struct TRestoreSnapshotResult {
+    1: optional Status.TStatus status
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1006,6 +1052,8 @@ service FrontendService {
     TCommitTxnResult commitTxn(1: TCommitTxnRequest request)
     TRollbackTxnResult rollbackTxn(1: TRollbackTxnRequest request)
     TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
+    TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
+    TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
 
     TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)
 
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 7edaecf7fb..0342f9bce0 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -91,6 +91,9 @@ enum TStatusCode {
     BINLOG_TOO_NEW_COMMIT_SEQ = 62,
     BINLOG_NOT_FOUND_DB = 63,
     BINLOG_NOT_FOUND_TABLE = 64,
+
+    // Snapshot Related from 70
+    SNAPSHOT_NOT_EXIST = 70,
 }
 
 struct TStatus {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org