You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:12:57 UTC
[doris] 03/13: [feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 053c8c303f7047e67677eae1c0cc138f54913fb9
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Wed Jun 7 21:35:15 2023 +0800
[feature](backup-restore) Add local backup/restore not upload/download by broker (#20492)
---
be/src/agent/task_worker_pool.cpp | 28 ++-
be/src/runtime/snapshot_loader.cpp | 269 +++++++++++++++++++++
be/src/runtime/snapshot_loader.h | 5 +
be/src/service/backend_service.cpp | 14 +-
.../org/apache/doris/analysis/RestoreStmt.java | 32 ++-
.../apache/doris/analysis/ShowSnapshotStmt.java | 33 ++-
.../org/apache/doris/backup/BackupHandler.java | 132 +++++++---
.../java/org/apache/doris/backup/BackupJob.java | 25 +-
.../org/apache/doris/backup/BackupJobInfo.java | 93 ++++++-
.../java/org/apache/doris/backup/BackupMeta.java | 14 ++
.../java/org/apache/doris/backup/Repository.java | 2 +
.../java/org/apache/doris/backup/RestoreJob.java | 206 +++++++++++++++-
.../java/org/apache/doris/backup/Snapshot.java | 69 ++++++
.../apache/doris/service/FrontendServiceImpl.java | 194 ++++++++++++++-
.../java/org/apache/doris/task/DownloadTask.java | 36 ++-
gensrc/thrift/AgentService.thrift | 11 +
gensrc/thrift/FrontendService.thrift | 48 ++++
gensrc/thrift/Status.thrift | 3 +
18 files changed, 1147 insertions(+), 67 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index f596603c35..d830d592e2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -761,20 +761,28 @@ void TaskWorkerPool::_download_worker_thread_callback() {
_tasks.pop_front();
}
LOG(INFO) << "get download task. signature=" << agent_task_req.signature
- << ", job_id=" << download_request.job_id;
+ << ", job_id=" << download_request.job_id
+ << "task detail: " << apache::thrift::ThriftDebugString(download_request);
// TODO: download
std::vector<int64_t> downloaded_tablet_ids;
- std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
- _env, download_request.job_id, agent_task_req.signature,
- download_request.broker_addr, download_request.broker_prop);
- Status status = loader->init(
- download_request.__isset.storage_backend ? download_request.storage_backend
- : TStorageBackendType::type::BROKER,
- download_request.__isset.location ? download_request.location : "");
- if (status.ok()) {
- status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+ auto status = Status::OK();
+ if (download_request.__isset.remote_tablet_snapshots) {
+ SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature);
+ loader.remote_http_download(download_request.remote_tablet_snapshots,
+ &downloaded_tablet_ids);
+ } else {
+ std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
+ _env, download_request.job_id, agent_task_req.signature,
+ download_request.broker_addr, download_request.broker_prop);
+ status = loader->init(
+ download_request.__isset.storage_backend ? download_request.storage_backend
+ : TStorageBackendType::type::BROKER,
+ download_request.__isset.location ? download_request.location : "");
+ if (status.ok()) {
+ status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
+ }
}
if (!status.ok()) {
diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp
index 4db803b10e..f1b58fa454 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -34,9 +34,12 @@
#include <cstring>
#include <filesystem>
#include <istream>
+#include <unordered_map>
#include <utility>
#include "common/logging.h"
+#include "gutil/strings/split.h"
+#include "http/http_client.h"
#include "io/fs/broker_file_system.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
@@ -370,6 +373,272 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
return status;
}
+Status SnapshotLoader::remote_http_download(
+ const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
+ std::vector<int64_t>* downloaded_tablet_ids) {
+ LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
+ _task_id);
+ constexpr uint32_t kListRemoteFileTimeout = 15;
+ constexpr uint32_t kDownloadFileMaxRetry = 3;
+ constexpr uint32_t kGetLengthTimeout = 10;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
+ Status status = Status::OK();
+
+ // Step before, validate all remote
+
+ // Step 1: Validate local tablet snapshot paths
+ for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ auto& path = remote_tablet_snapshot.local_snapshot_path;
+ bool res = true;
+ RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
+ if (!res) {
+ std::stringstream ss;
+ auto err_msg =
+ fmt::format("snapshot path is not directory or does not exist: {}", path);
+ LOG(WARNING) << err_msg;
+ return Status::RuntimeError(err_msg);
+ }
+ }
+
+ // Step 2: get all local files
+ struct LocalFileStat {
+ uint64_t size;
+ // TODO(Drogon): add md5sum
+ };
+ std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
+ for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+ std::vector<std::string> local_files;
+ RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
+
+ auto& local_filestat = local_files_map[local_path];
+ for (auto& local_file : local_files) {
+ // add file size
+ std::string local_file_path = local_path + "/" + local_file;
+ std::error_code ec;
+ uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
+ ec.message());
+ }
+ local_filestat[local_file] = {local_file_size};
+ }
+ }
+
+ // Step 3: Validate remote tablet snapshot paths && remote files map
+ // TODO(Drogon): Add md5sum check
+ // key is remote snapshot paths, value is filelist
+ // get all these use http download action
+ // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+ int report_counter = 0;
+ int total_num = remote_tablet_snapshots.size();
+ int finished_num = 0;
+ struct RemoteFileStat {
+ // TODO(Drogon): Add md5sum
+ std::string url;
+ uint64_t size;
+ };
+ std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
+ remote_files_map;
+ for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+ auto& remote_files = remote_files_map[remote_path];
+ const auto& token = remote_tablet_snapshot.remote_token;
+ const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+ // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+ std::string remote_url_prefix =
+ fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
+ remote_be_addr.hostname, remote_be_addr.port, token, remote_path);
+
+ string file_list_str;
+ auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(remote_url_prefix));
+ client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+ return client->execute(&file_list_str);
+ };
+ RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb));
+ std::vector<string> filename_list =
+ strings::Split(file_list_str, "\n", strings::SkipWhitespace());
+
+ for (const auto& filename : filename_list) {
+ std::string remote_file_url = fmt::format(
+ "http://{}:{}/api/_tablet/_download?token={}&file={}/{}",
+ remote_tablet_snapshot.remote_be_addr.hostname,
+ remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
+ remote_tablet_snapshot.remote_snapshot_path, filename);
+
+ // get file length
+ uint64_t file_size = 0;
+ auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(remote_file_url));
+ client->set_timeout_ms(kGetLengthTimeout * 1000);
+ RETURN_IF_ERROR(client->head());
+ RETURN_IF_ERROR(client->get_content_length(&file_size));
+ return Status::OK();
+ };
+ RETURN_IF_ERROR(
+ HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb));
+
+ remote_files[filename] = RemoteFileStat {remote_file_url, file_size};
+ }
+ }
+
+ // Step 4: Compare local and remote files && get all need download files
+ for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
+ TTaskType::type::DOWNLOAD));
+
+ const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+ const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+ auto& remote_files = remote_files_map[remote_path];
+ auto& local_files = local_files_map[local_path];
+ auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
+
+ // get all need download files
+ std::vector<std::string> need_download_files;
+ for (const auto& [remote_file, remote_filestat] : remote_files) {
+ LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
+ remote_filestat.size);
+ auto it = local_files.find(remote_file);
+ if (it == local_files.end()) {
+ need_download_files.emplace_back(remote_file);
+ continue;
+ }
+ if (_end_with(remote_file, ".hdr")) {
+ need_download_files.emplace_back(remote_file);
+ continue;
+ }
+
+ if (auto& local_filestat = it->second; local_filestat.size != remote_filestat.size) {
+ need_download_files.emplace_back(remote_file);
+ continue;
+ }
+ // TODO(Drogon): check by md5sum, if not match then download
+
+ LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
+ }
+
+ auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
+ TabletSharedPtr tablet =
+ _env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id);
+ if (tablet == nullptr) {
+ std::stringstream ss;
+ ss << "failed to get local tablet: " << local_tablet_id;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+ DataDir* data_dir = tablet->data_dir();
+
+ // download all need download files
+ uint64_t total_file_size = 0;
+ MonotonicStopWatch watch;
+ watch.start();
+ for (auto& filename : need_download_files) {
+ auto& remote_filestat = remote_files[filename];
+ auto file_size = remote_filestat.size;
+ auto& remote_file_url = remote_filestat.url;
+
+ // check disk capacity
+ if (data_dir->reach_capacity_limit(file_size)) {
+ return Status::InternalError("Disk reach capacity limit");
+ }
+
+ total_file_size += file_size;
+ uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ std::string local_filename;
+ RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, &local_filename));
+ std::string local_file_path = local_path + "/" + local_filename;
+
+ LOG(INFO) << "clone begin to download file from: " << remote_file_url
+ << " to: " << local_file_path << ". size(B): " << file_size
+ << ", timeout(s): " << estimate_timeout;
+
+ auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
+ file_size](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(remote_file_url));
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download(local_file_path));
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_file_size = std::filesystem::file_size(local_file_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to {}",
+ local_file_path, ec.message());
+ }
+ if (local_file_size != file_size) {
+ LOG(WARNING) << "download file length error"
+ << ", remote_path=" << remote_file_url
+ << ", file_size=" << file_size
+ << ", local_file_size=" << local_file_size;
+ return Status::InternalError("downloaded file size is not equal");
+ }
+ chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
+ return Status::OK();
+ };
+ RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb));
+
+ // local_files always keep the updated local files
+ local_files[filename] = LocalFileStat {file_size};
+ }
+
+ uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+ total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+ double copy_rate = 0.0;
+ if (total_time_ms > 0) {
+ copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+ }
+ LOG(INFO) << fmt::format(
+ "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
+ "{} ms, rate: {} MB/s",
+ remote_tablet_id, local_tablet_id, total_file_size, total_time_ms, copy_rate);
+
+ // local_files: contain all remote files and local files
+ // finally, delete local files which are not in remote
+ for (const auto& [local_file, local_filestat] : local_files) {
+ // replace the tablet id in local file name with the remote tablet id,
+ // in order to compare the file name.
+ std::string new_name;
+ Status st = _replace_tablet_id(local_file, remote_tablet_id, &new_name);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to replace tablet id. unknown local file: " << st
+ << ". ignore it";
+ continue;
+ }
+ VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
+ const auto& find = remote_files.find(new_name);
+ if (find != remote_files.end()) {
+ continue;
+ }
+
+ // delete
+ std::string full_local_file = local_path + "/" + local_file;
+ LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
+ << ", it does not exist in remote";
+ if (remove(full_local_file.c_str()) != 0) {
+ LOG(WARNING) << "failed to delete unknown local file: " << full_local_file
+ << ", error: " << strerror(errno)
+ << ", file size: " << local_filestat.size << ", ignore it";
+ }
+ }
+
+ ++finished_num;
+ }
+
+ LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
+ return status;
+}
+
// move the snapshot files in snapshot_path
// to tablet_path
// If overwrite, just replace the tablet_path with snapshot_path,
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 9e7a22d7a3..c0d1f0f708 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -33,6 +33,8 @@ namespace io {
class RemoteFileSystem;
} // namespace io
+class TRemoteTabletSnapshot;
+
struct FileStat {
std::string name;
std::string md5;
@@ -77,6 +79,9 @@ public:
Status download(const std::map<std::string, std::string>& src_to_dest_path,
std::vector<int64_t>* downloaded_tablet_ids);
+ Status remote_http_download(const std::vector<TRemoteTabletSnapshot>& remote_tablets,
+ std::vector<int64_t>* downloaded_tablet_ids);
+
Status move(const std::string& snapshot_path, TabletSharedPtr tablet, bool overwrite);
private:
diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp
index a5e528b45f..1b4a1ec944 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -396,49 +396,49 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
}
/// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id
- if (request.__isset.txn_id) {
+ if (!request.__isset.txn_id) {
LOG(WARNING) << "txn_id is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("txn_id is empty");
return;
}
- if (request.__isset.remote_tablet_id) {
+ if (!request.__isset.remote_tablet_id) {
LOG(WARNING) << "remote_tablet_id is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("remote_tablet_id is empty");
return;
}
- if (request.__isset.binlog_version) {
+ if (!request.__isset.binlog_version) {
LOG(WARNING) << "binlog_version is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("binlog_version is empty");
return;
}
- if (request.__isset.remote_host) {
+ if (!request.__isset.remote_host) {
LOG(WARNING) << "remote_host is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("remote_host is empty");
return;
}
- if (request.__isset.remote_port) {
+ if (!request.__isset.remote_port) {
LOG(WARNING) << "remote_port is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("remote_port is empty");
return;
}
- if (request.__isset.partition_id) {
+ if (!request.__isset.partition_id) {
LOG(WARNING) << "partition_id is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
tstatus.error_msgs.emplace_back("partition_id is empty");
return;
}
- if (request.__isset.load_id) {
+ if (!request.__isset.load_id) {
LOG(WARNING) << "load_id is empty";
tstatus.__set_status_code(TStatusCode::ANALYSIS_ERROR);
tstatus.__isset.error_msgs = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 679ebd8cb8..2382093d6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -17,6 +17,7 @@
package org.apache.doris.analysis;
+import org.apache.doris.backup.Repository;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@@ -45,12 +46,22 @@ public class RestoreStmt extends AbstractBackupStmt {
private int metaVersion = -1;
private boolean reserveReplica = false;
private boolean reserveDynamicPartitionEnable = false;
+ private boolean isLocal = false;
+ private byte[] meta = null;
+ private byte[] jobInfo = null;
public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
Map<String, String> properties) {
super(labelName, repoName, restoreTableRefClause, properties);
}
+ public RestoreStmt(LabelName labelName, String repoName, AbstractBackupTableRefClause restoreTableRefClause,
+ Map<String, String> properties, byte[] meta, byte[] jobInfo) {
+ super(labelName, repoName, restoreTableRefClause, properties);
+ this.meta = meta;
+ this.jobInfo = jobInfo;
+ }
+
public boolean allowLoad() {
return allowLoad;
}
@@ -75,8 +86,23 @@ public class RestoreStmt extends AbstractBackupStmt {
return reserveDynamicPartitionEnable;
}
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public byte[] getMeta() {
+ return meta;
+ }
+
+ public byte[] getJobInfo() {
+ return jobInfo;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
+ if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+ isLocal = true;
+ }
super.analyze(analyzer);
}
@@ -148,8 +174,10 @@ public class RestoreStmt extends AbstractBackupStmt {
backupTimestamp = copiedProperties.get(PROP_BACKUP_TIMESTAMP);
copiedProperties.remove(PROP_BACKUP_TIMESTAMP);
} else {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
- "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+ if (!isLocal) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+ "Missing " + PROP_BACKUP_TIMESTAMP + " property");
+ }
}
// meta version
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
index bdf33ddfec..d10d216b12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSnapshotStmt.java
@@ -28,6 +28,11 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
public class ShowSnapshotStmt extends ShowStmt {
+ public enum SnapshotType {
+ REMOTE,
+ LOCAL
+ }
+
public static final ImmutableList<String> SNAPSHOT_ALL = new ImmutableList.Builder<String>()
.add("Snapshot").add("Timestamp").add("Status")
.build();
@@ -39,6 +44,7 @@ public class ShowSnapshotStmt extends ShowStmt {
private Expr where;
private String snapshotName;
private String timestamp;
+ private SnapshotType snapshotType = SnapshotType.REMOTE;
public ShowSnapshotStmt(String repoName, Expr where) {
this.repoName = repoName;
@@ -87,7 +93,7 @@ public class ShowSnapshotStmt extends ShowStmt {
if (!ok) {
throw new AnalysisException("Where clause should looks like: SNAPSHOT = 'your_snapshot_name'"
- + " [AND TIMESTAMP = '2018-04-18-19-19-10']");
+ + " [AND TIMESTAMP = '2018-04-18-19-19-10'] [AND SNAPSHOTTYPE = 'remote' | 'local']");
}
}
}
@@ -116,10 +122,25 @@ public class ShowSnapshotStmt extends ShowStmt {
return false;
}
return true;
+ } else if (name.equalsIgnoreCase("snapshotType")) {
+ String snapshotTypeVal = ((StringLiteral) val).getStringValue();
+ if (Strings.isNullOrEmpty(snapshotTypeVal)) {
+ return false;
+ }
+ // snapshotType now only support "remote" and "local"
+ switch (snapshotTypeVal.toLowerCase()) {
+ case "remote":
+ snapshotType = SnapshotType.REMOTE;
+ return true;
+ case "local":
+ snapshotType = SnapshotType.LOCAL;
+ return true;
+ default:
+ return false;
+ }
+ } else {
+ return false;
}
-
- return false;
-
}
public String getRepoName() {
@@ -134,6 +155,10 @@ public class ShowSnapshotStmt extends ShowStmt {
return timestamp;
}
+ public String getSnapshotType() {
+ return snapshotType.name();
+ }
+
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d149ad574d..6619c457b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.task.DirMoveTask;
@@ -56,13 +57,16 @@ import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
@@ -75,7 +79,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -104,6 +110,12 @@ public class BackupHandler extends MasterDaemon implements Writable {
private Env env;
+ // map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
+ // this map not present in persist && only in fe master memory
+ // one table only keep one snapshot info, only keep last
+ private final Map<String, Snapshot> localSnapshots = new HashMap<>();
+ private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
+
public BackupHandler() {
// for persist
}
@@ -241,9 +253,13 @@ public class BackupHandler extends MasterDaemon implements Writable {
public void process(AbstractBackupStmt stmt) throws DdlException {
// check if repo exist
String repoName = stmt.getRepoName();
- Repository repository = repoMgr.getRepo(repoName);
- if (repository == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repoName + " does not exist");
+ Repository repository = null;
+ if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
+ repository = repoMgr.getRepo(repoName);
+ if (repository == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Repository " + repoName + " does not exist");
+ }
}
// check if db exist
@@ -286,7 +302,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
private void backup(Repository repository, Database db, BackupStmt stmt) throws DdlException {
- if (repository.isReadOnly()) {
+ if (repository != null && repository.isReadOnly()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repository.getName()
+ " is read only");
}
@@ -357,25 +373,29 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
// Check if label already be used
- List<String> existSnapshotNames = Lists.newArrayList();
- Status st = repository.listSnapshots(existSnapshotNames);
- if (!st.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
- }
- if (existSnapshotNames.contains(stmt.getLabel())) {
- if (stmt.getType() == BackupType.FULL) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
- + stmt.getLabel() + "' already exist in repository");
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
- + "incremental backup");
+ long repoId = -1;
+ if (repository != null) {
+ List<String> existSnapshotNames = Lists.newArrayList();
+ Status st = repository.listSnapshots(existSnapshotNames);
+ if (!st.ok()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
+ }
+ if (existSnapshotNames.contains(stmt.getLabel())) {
+ if (stmt.getType() == BackupType.FULL) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
+ + stmt.getLabel() + "' already exist in repository");
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
+ + "incremental backup");
+ }
}
+ repoId = repository.getId();
}
// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
ClusterNamespace.getNameFromFullName(db.getFullName()),
- tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repository.getId());
+ tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
// write log
env.getEditLog().logBackupJob(backupJob);
@@ -386,26 +406,62 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
private void restore(Repository repository, Database db, RestoreStmt stmt) throws DdlException {
- // Check if snapshot exist in repository
- List<BackupJobInfo> infos = Lists.newArrayList();
- Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
- if (!status.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
- + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+ BackupJobInfo jobInfo;
+ if (stmt.isLocal()) {
+ String jobInfoString = new String(stmt.getJobInfo());
+ jobInfo = BackupJobInfo.genFromJson(jobInfoString);
+
+ if (jobInfo.extraInfo == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty");
+ }
+ if (jobInfo.extraInfo.beNetworkMap == null) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info be network map");
+ }
+ if (Strings.isNullOrEmpty(jobInfo.extraInfo.token)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info token");
+ }
+ } else {
+ // Check if snapshot exist in repository
+ List<BackupJobInfo> infos = Lists.newArrayList();
+ Status status = repository.getSnapshotInfoFile(stmt.getLabel(), stmt.getBackupTimestamp(), infos);
+ if (!status.ok()) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+ "Failed to get info of snapshot '" + stmt.getLabel() + "' because: "
+ + status.getErrMsg() + ". Maybe specified wrong backup timestamp");
+ }
+
+ // Check if all restore objects are exist in this snapshot.
+ // Also remove all unrelated objs
+ Preconditions.checkState(infos.size() == 1);
+ jobInfo = infos.get(0);
}
- // Check if all restore objects are exist in this snapshot.
- // Also remove all unrelated objs
- Preconditions.checkState(infos.size() == 1);
- BackupJobInfo jobInfo = infos.get(0);
checkAndFilterRestoreObjsExistInSnapshot(jobInfo, stmt.getAbstractBackupTableRefClause());
// Create a restore job
- RestoreJob restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
+ RestoreJob restoreJob;
+ if (stmt.isLocal()) {
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(stmt.getMeta());
+ DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
+ try {
+ BackupMeta backupMeta = BackupMeta.read(dataInputStream);
+ String backupTimestamp =
+ TimeUtils.longToTimeString(jobInfo.getBackupTime(), TimeUtils.DATETIME_FORMAT_WITH_HYPHEN);
+ restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
+ db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
+ stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(),
+ stmt.reserveDynamicPartitionEnable(),
+ env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
+ } catch (IOException e) {
+ throw new DdlException(e.getMessage());
+ }
+ } else {
+ restoreJob = new RestoreJob(stmt.getLabel(), stmt.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), stmt.getReplicaAlloc(),
stmt.getTimeoutMs(), stmt.getMetaVersion(), stmt.reserveReplica(), stmt.reserveDynamicPartitionEnable(),
env, repository.getId());
+ }
+
env.getEditLog().logRestoreJob(restoreJob);
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
@@ -667,6 +723,24 @@ public class BackupHandler extends MasterDaemon implements Writable {
return false;
}
+ public void addSnapshot(String labelName, Snapshot snapshot) {
+ localSnapshotsLock.writeLock().lock();
+ try {
+ localSnapshots.put(labelName, snapshot);
+ } finally {
+ localSnapshotsLock.writeLock().unlock();
+ }
+ }
+
+ public Snapshot getSnapshot(String labelName) {
+ localSnapshotsLock.readLock().lock();
+ try {
+ return localSnapshots.get(labelName);
+ } finally {
+ localSnapshotsLock.readLock().unlock();
+ }
+ }
+
public static BackupHandler read(DataInput in) throws IOException {
BackupHandler backupHandler = new BackupHandler();
backupHandler.readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 2300cd1e01..60d486c310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -114,6 +114,9 @@ public class BackupJob extends AbstractJob {
// backup properties
private Map<String, String> properties = Maps.newHashMap();
+ private byte[] metaInfoBytes = null;
+ private byte[] jobInfoBytes = null;
+
public BackupJob() {
super(JobType.BACKUP);
}
@@ -282,7 +285,7 @@ public class BackupJob extends AbstractJob {
}
// get repo if not set
- if (repo == null) {
+ if (repo == null && repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
if (repo == null) {
status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -565,6 +568,11 @@ public class BackupJob extends AbstractJob {
}
private void uploadSnapshot() {
+ if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+ state = BackupJobState.UPLOADING;
+ return;
+ }
+
// reuse this set to save all unfinished tablets
unfinishedTaskIds.clear();
taskProgress.clear();
@@ -673,6 +681,8 @@ public class BackupJob extends AbstractJob {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
+ // read meta info to metaInfoBytes
+ metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
// 3. save job info file
jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
@@ -685,6 +695,8 @@ public class BackupJob extends AbstractJob {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
+ // read job info to jobInfoBytes
+ jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
@@ -697,7 +709,9 @@ public class BackupJob extends AbstractJob {
jobInfo = null;
// release all snapshots before clearing the snapshotInfos.
- releaseSnapshots();
+ if (repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
+ releaseSnapshots();
+ }
snapshotInfos.clear();
@@ -724,6 +738,13 @@ public class BackupJob extends AbstractJob {
}
private void uploadMetaAndJobInfoFile() {
+ if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
+ state = BackupJobState.FINISHED;
+ Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
+ env.getBackupHandler().addSnapshot(label, snapshot);
+ return;
+ }
+
String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label);
if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) {
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index ec622dbdca..4457740440 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@@ -92,10 +93,39 @@ public class BackupJobInfo implements Writable {
@SerializedName("meta_version")
public int metaVersion;
+ @SerializedName("tablet_be_map")
+ public Map<Long, Long> tabletBeMap = Maps.newHashMap();
+
+ @SerializedName("tablet_snapshot_path_map")
+ public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
+
+ public static class ExtraInfo {
+ public static class NetworkAddrss {
+ @SerializedName("ip")
+ public String ip;
+ @SerializedName("port")
+ public int port;
+ }
+
+ @SerializedName("be_network_map")
+ public Map<Long, NetworkAddrss> beNetworkMap = Maps.newHashMap();
+
+ @SerializedName("token")
+ public String token;
+ }
+
+ @SerializedName("extra_info")
+ public ExtraInfo extraInfo;
+
+
// This map is used to save the table alias mapping info when processing a restore job.
// origin -> alias
public Map<String, String> tblAlias = Maps.newHashMap();
+ public long getBackupTime() {
+ return backupTime;
+ }
+
public void initBackupJobInfoAfterDeserialize() {
// transform success
if (successJson.equals("succeed")) {
@@ -487,6 +517,62 @@ public class BackupJobInfo implements Writable {
return Joiner.on("/").join(pathSeg);
}
+ // struct TRemoteTabletSnapshot {
+ // 1: optional i64 local_tablet_id
+ // 2: optional string local_snapshot_path
+ // 3: optional i64 remote_tablet_id
+ // 4: optional i64 remote_be_id
+ // 5: optional Types.TSchemaHash schema_hash
+ // 6: optional Types.TNetworkAddress remote_be_addr
+ // 7: optional string remote_snapshot_path
+ // 8: optional string token
+ // }
+
+ public String getTabletSnapshotPath(Long tabletId) {
+ return tabletSnapshotPathMap.get(tabletId);
+ }
+
+ public Long getBeId(Long tabletId) {
+ return tabletBeMap.get(tabletId);
+ }
+
+ public String getToken() {
+ return extraInfo.token;
+ }
+
+ public TNetworkAddress getBeAddr(Long beId) {
+ ExtraInfo.NetworkAddrss addr = extraInfo.beNetworkMap.get(beId);
+ if (addr == null) {
+ return null;
+ }
+
+ return new TNetworkAddress(addr.ip, addr.port);
+ }
+
+ // TODO(Drogon): improve this find perfermance
+ public Long getSchemaHash(long tableId, long partitionId, long indexId) {
+ for (BackupOlapTableInfo backupOlapTableInfo : backupOlapTableObjects.values()) {
+ if (backupOlapTableInfo.id != tableId) {
+ continue;
+ }
+
+ for (BackupPartitionInfo backupPartitionInfo : backupOlapTableInfo.partitions.values()) {
+ if (backupPartitionInfo.id != partitionId) {
+ continue;
+ }
+
+ for (BackupIndexInfo backupIndexInfo : backupPartitionInfo.indexes.values()) {
+ if (backupIndexInfo.id != indexId) {
+ continue;
+ }
+
+ return Long.valueOf(backupIndexInfo.schemaHash);
+ }
+ }
+ }
+ return null;
+ }
+
public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
BackupContent content, BackupMeta backupMeta,
Map<Long, SnapshotInfo> snapshotInfos) {
@@ -526,8 +612,11 @@ public class BackupJobInfo implements Writable {
}
} else {
for (Tablet tablet : index.getTablets()) {
+ SnapshotInfo snapshotInfo = snapshotInfos.get(tablet.getId());
idxInfo.tablets.put(tablet.getId(),
- Lists.newArrayList(snapshotInfos.get(tablet.getId()).getFiles()));
+ Lists.newArrayList(snapshotInfo.getFiles()));
+ jobInfo.tabletBeMap.put(tablet.getId(), snapshotInfo.getBeId());
+ jobInfo.tabletSnapshotPathMap.put(tablet.getId(), snapshotInfo.getPath());
}
}
idxInfo.tabletsOrder.addAll(index.getTabletIdsInOrder());
@@ -578,7 +667,7 @@ public class BackupJobInfo implements Writable {
return genFromJson(json);
}
- private static BackupJobInfo genFromJson(String json) {
+ public static BackupJobInfo genFromJson(String json) {
/* parse the json string:
* {
* "backup_time": 1522231864000,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index e059d55be6..e22bb7f33c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -21,8 +21,10 @@ import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.io.Writable;
import org.apache.doris.meta.MetaContext;
+import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -38,10 +40,13 @@ import java.util.Map;
public class BackupMeta implements Writable {
// tbl name -> tbl
+ @SerializedName(value = "tblNameMap")
private Map<String, Table> tblNameMap = Maps.newHashMap();
// tbl id -> tbl
+ @SerializedName(value = "tblIdMap")
private Map<Long, Table> tblIdMap = Maps.newHashMap();
// resource name -> resource
+ @SerializedName(value = "resourceNameMap")
private Map<String, Resource> resourceNameMap = Maps.newHashMap();
private BackupMeta() {
@@ -136,4 +141,13 @@ public class BackupMeta implements Writable {
resourceNameMap.put(resource.getName(), resource);
}
}
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 6d421332dc..ba95a77352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -97,6 +97,8 @@ public class Repository implements Writable {
public static final String FILE_REPO_INFO = "__repo_info";
public static final String FILE_META_INFO = "__meta";
public static final String DIR_SNAPSHOT_CONTENT = "__ss_content";
+ public static final String KEEP_ON_LOCAL_REPO_NAME = "__keep_on_local__";
+ public static final long KEEP_ON_LOCAL_REPO_ID = -1;
private static final Logger LOG = LogManager.getLogger(Repository.class);
private static final String PATH_DELIMITER = "/";
private static final String CHECKSUM_SEPARATOR = ".";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 60ff99c4b9..e9e2eee824 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -71,6 +71,8 @@ import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -185,6 +187,18 @@ public class RestoreJob extends AbstractJob {
properties.put(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, String.valueOf(reserveDynamicPartitionEnable));
}
+ public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad,
+ ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica,
+ boolean reserveDynamicPartitionEnable, Env env, long repoId, BackupMeta backupMeta) {
+ this(label, backupTs, dbId, dbName, jobInfo, allowLoad, replicaAlloc, timeoutMs, metaVersion, reserveReplica,
+ reserveDynamicPartitionEnable, env, repoId);
+ this.backupMeta = backupMeta;
+ }
+
+ public boolean isFromLocalSnapshot() {
+ return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+ }
+
public RestoreJobState getState() {
return state;
}
@@ -324,7 +338,7 @@ public class RestoreJob extends AbstractJob {
}
// get repo if not set
- if (repo == null) {
+ if (repo == null && !isFromLocalSnapshot()) {
repo = env.getBackupHandler().getRepoMgr().getRepo(repoId);
if (repo == null) {
status = new Status(ErrCode.COMMON_ERROR, "failed to get repository: " + repoId);
@@ -1109,6 +1123,15 @@ public class RestoreJob extends AbstractJob {
}
private boolean downloadAndDeserializeMetaInfo() {
+ if (isFromLocalSnapshot()) {
+ if (backupMeta != null) {
+ return true;
+ }
+
+ status = new Status(ErrCode.COMMON_ERROR, "backupMeta is null");
+ return false;
+ }
+
List<BackupMeta> backupMetas = Lists.newArrayList();
Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas,
this.metaVersion == -1 ? jobInfo.metaVersion : this.metaVersion);
@@ -1251,7 +1274,15 @@ public class RestoreJob extends AbstractJob {
}
private void downloadSnapshots() {
- // Categorize snapshot infos by db id.
+ if (isFromLocalSnapshot()) {
+ downloadLocalSnapshots();
+ } else {
+ downloadRemoteSnapshots();
+ }
+ }
+
+ private void downloadRemoteSnapshots() {
+ // Categorize snapshot onfos by db id.
ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
for (SnapshotInfo info : snapshotInfos.values()) {
dbToSnapshotInfos.put(info.getDbId(), info);
@@ -1289,7 +1320,8 @@ public class RestoreJob extends AbstractJob {
LOG.debug("backend {} has {} batch, total {} tasks, {}",
beId, batchNum, totalNum, this);
- List<FsBroker> brokerAddrs = Lists.newArrayList();
+ List<FsBroker> brokerAddrs = null;
+ brokerAddrs = Lists.newArrayList();
Status st = repo.getBrokerAddress(beId, env, brokerAddrs);
if (!st.ok()) {
status = st;
@@ -1401,6 +1433,174 @@ public class RestoreJob extends AbstractJob {
LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
}
+ private void downloadLocalSnapshots() {
+ // Categorize snapshot infos by db id.
+ ArrayListMultimap<Long, SnapshotInfo> dbToSnapshotInfos = ArrayListMultimap.create();
+ for (SnapshotInfo info : snapshotInfos.values()) {
+ dbToSnapshotInfos.put(info.getDbId(), info);
+ }
+
+ // Send download tasks
+ unfinishedSignatureToId.clear();
+ taskProgress.clear();
+ taskErrMsg.clear();
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (long dbId : dbToSnapshotInfos.keySet()) {
+ List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
+
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ status = new Status(ErrCode.NOT_FOUND, "db " + dbId + " does not exist");
+ return;
+ }
+
+ // We classify the snapshot info by backend
+ ArrayListMultimap<Long, SnapshotInfo> beToSnapshots = ArrayListMultimap.create();
+ for (SnapshotInfo info : infos) {
+ beToSnapshots.put(info.getBeId(), info);
+ }
+
+ db.readLock();
+ try {
+ for (Long beId : beToSnapshots.keySet()) {
+ List<SnapshotInfo> beSnapshotInfos = beToSnapshots.get(beId);
+ int totalNum = beSnapshotInfos.size();
+ // each backend allot at most 3 tasks
+ int batchNum = Math.min(totalNum, 3);
+ // each task contains several upload sub tasks
+ int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
+
+ // allot tasks
+ int index = 0;
+ for (int batch = 0; batch < batchNum; batch++) {
+ List<TRemoteTabletSnapshot> remoteTabletSnapshots = Lists.newArrayList();
+ int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum - index : taskNumPerBatch;
+ for (int j = 0; j < currentBatchTaskNum; j++) {
+ TRemoteTabletSnapshot remoteTabletSnapshot = new TRemoteTabletSnapshot();
+
+ SnapshotInfo info = beSnapshotInfos.get(index++);
+ Table tbl = db.getTableNullable(info.getTblId());
+ if (tbl == null) {
+ status = new Status(ErrCode.NOT_FOUND, "restored table "
+ + info.getTabletId() + " does not exist");
+ return;
+ }
+ OlapTable olapTbl = (OlapTable) tbl;
+ olapTbl.readLock();
+ try {
+ Partition part = olapTbl.getPartition(info.getPartitionId());
+ if (part == null) {
+ status = new Status(ErrCode.NOT_FOUND, "partition "
+ + info.getPartitionId() + " does not exist in restored table: "
+ + tbl.getName());
+ return;
+ }
+
+ MaterializedIndex idx = part.getIndex(info.getIndexId());
+ if (idx == null) {
+ status = new Status(ErrCode.NOT_FOUND, "index " + info.getIndexId()
+ + " does not exist in partion " + part.getName()
+ + "of restored table " + tbl.getName());
+ return;
+ }
+
+ Tablet tablet = idx.getTablet(info.getTabletId());
+ if (tablet == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "tablet " + info.getTabletId() + " does not exist in restored table "
+ + tbl.getName());
+ return;
+ }
+
+ Replica replica = tablet.getReplicaByBackendId(info.getBeId());
+ if (replica == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "replica in be " + info.getBeId() + " of tablet "
+ + tablet.getId() + " does not exist in restored table "
+ + tbl.getName());
+ return;
+ }
+
+ IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(),
+ info.getTabletId(), replica.getId());
+ IdChain repoIds = fileMapping.get(catalogIds);
+ if (repoIds == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "failed to get id mapping of catalog ids: " + catalogIds.toString());
+ return;
+ }
+
+ SnapshotInfo snapshotInfo = snapshotInfos.get(info.getTabletId(), info.getBeId());
+ Preconditions.checkNotNull(snapshotInfo, info.getTabletId() + "-" + info.getBeId());
+ // download to previous exist snapshot dir
+ String dest = snapshotInfo.getTabletPath();
+
+ Long localTabletId = info.getTabletId();
+ String localSnapshotPath = dest;
+ Long remoteTabletId = repoIds.getTabletId();
+ Long remoteBeId = jobInfo.getBeId(remoteTabletId);
+ String remoteSnapshotPath = jobInfo.getTabletSnapshotPath(remoteTabletId);
+ if (remoteSnapshotPath == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "failed to get remote snapshot path of tablet: " + remoteTabletId);
+ return;
+ }
+ Long schemaHash = jobInfo.getSchemaHash(
+ repoIds.getTblId(), repoIds.getPartId(), repoIds.getIdxId());
+ if (schemaHash == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "failed to get schema hash of table: " + repoIds.getTblId()
+ + ", partition: " + repoIds.getPartId()
+ + ", index: " + repoIds.getIdxId());
+ return;
+ }
+ // remoteSnapshotPath = "${remoteSnapshotPath}/${remoteTabletId}/${schemaHash}"
+ remoteSnapshotPath =
+ String.format("%s/%d/%d", remoteSnapshotPath, remoteTabletId, schemaHash);
+ TNetworkAddress remoteBeAddr = jobInfo.getBeAddr(remoteBeId);
+ if (remoteBeAddr == null) {
+ status = new Status(ErrCode.NOT_FOUND,
+ "failed to get remote be address of be: " + remoteBeId);
+ return;
+ }
+ String remoteToken = jobInfo.getToken();
+
+ remoteTabletSnapshot.setLocalTabletId(localTabletId);
+ remoteTabletSnapshot.setLocalSnapshotPath(localSnapshotPath);
+ remoteTabletSnapshot.setRemoteTabletId(remoteTabletId);
+ remoteTabletSnapshot.setRemoteBeId(remoteBeId);
+ remoteTabletSnapshot.setRemoteBeAddr(remoteBeAddr);
+ remoteTabletSnapshot.setRemoteSnapshotPath(remoteSnapshotPath);
+ remoteTabletSnapshot.setRemoteToken(remoteToken);
+
+ remoteTabletSnapshots.add(remoteTabletSnapshot);
+ } finally {
+ olapTbl.readUnlock();
+ }
+ }
+ long signature = env.getNextId();
+ DownloadTask task = new DownloadTask(null, beId, signature, jobId, dbId, remoteTabletSnapshots);
+ batchTask.addTask(task);
+ unfinishedSignatureToId.put(signature, beId);
+ }
+ }
+ } finally {
+ db.readUnlock();
+ }
+ }
+
+ // send task
+ for (AgentTask task : batchTask.getAllTasks()) {
+ AgentTaskQueue.addTask(task);
+ }
+ AgentTaskExecutor.submit(batchTask);
+
+ state = RestoreJobState.DOWNLOADING;
+
+ // No edit log here
+ LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
+ }
+
private void waitingAllDownloadFinished() {
if (unfinishedSignatureToId.isEmpty()) {
downloadFinishedTime = System.currentTimeMillis();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
new file mode 100644
index 0000000000..b26cb2e1e7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.backup;
+
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+public class Snapshot {
+ @SerializedName(value = "label")
+ private String label = null;
+
+ @SerializedName(value = "meta")
+ private byte[] meta = null;
+
+ @SerializedName(value = "jobInfo")
+ private byte[] jobInfo = null;
+
+ @SerializedName(value = "createTime")
+ private String createTime = null;
+
+ public Snapshot() {
+ }
+
+ public Snapshot(String label, byte[] meta, byte[] jobInfo) {
+ this.label = label;
+ this.meta = meta;
+ this.jobInfo = jobInfo;
+ }
+
+
+ public byte[] getMeta() {
+ return meta;
+ }
+
+ public byte[] getJobInfo() {
+ return jobInfo;
+ }
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ // return toJson();
+ return "Snapshot{"
+ + "label='" + label + '\''
+ + ", meta=" + meta
+ + ", jobInfo=" + jobInfo
+ + ", createTime='" + createTime + '\''
+ + '}';
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b650100823..2dbb007498 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -19,11 +19,15 @@ package org.apache.doris.service;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.AddColumnsClause;
+import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.Snapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
@@ -63,6 +67,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
+import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.VariableMgr;
@@ -106,6 +111,8 @@ import org.apache.doris.thrift.TGetBinlogResult;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetQueryStatsRequest;
+import org.apache.doris.thrift.TGetSnapshotRequest;
+import org.apache.doris.thrift.TGetSnapshotResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
@@ -137,11 +144,14 @@ import org.apache.doris.thrift.TReplicaInfo;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TReportRequest;
+import org.apache.doris.thrift.TRestoreSnapshotRequest;
+import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
+import org.apache.doris.thrift.TSnapshotType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -2124,15 +2134,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new UserException("prev_commit_seq is not set");
}
+
+ // step 1: check auth
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
cluster = SystemInfoService.DEFAULT_CLUSTER;
}
-
- // step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(),
- request.getUserIp(), PrivPredicate.LOAD);
+ request.getUserIp(), PrivPredicate.SELECT);
}
// step 3: check database
@@ -2181,4 +2191,182 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
return result;
}
+
+ // getSnapshot
+ public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException {
+ String clientAddr = getClientAddrAsString();
+ LOG.trace("receive get snapshot info request: {}", request);
+
+ TGetSnapshotResult result = new TGetSnapshotResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ result = getSnapshotImpl(request, clientAddr);
+ } catch (UserException e) {
+ LOG.warn("failed to get snapshot info: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
+ }
+
+ return result;
+ }
+
+ // getSnapshotImpl
+ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp)
+ throws UserException {
+ // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type
+ if (!request.isSetUser()) {
+ throw new UserException("user is not set");
+ }
+ if (!request.isSetPasswd()) {
+ throw new UserException("passwd is not set");
+ }
+ if (!request.isSetDb()) {
+ throw new UserException("db is not set");
+ }
+ if (!request.isSetLabelName()) {
+ throw new UserException("label_name is not set");
+ }
+ if (!request.isSetSnapshotName()) {
+ throw new UserException("snapshot_name is not set");
+ }
+ if (!request.isSetSnapshotType()) {
+ throw new UserException("snapshot_type is not set");
+ } else if (request.getSnapshotType() != TSnapshotType.LOCAL) {
+ throw new UserException("snapshot_type is not LOCAL");
+ }
+
+ // Step 2: check auth
+ String cluster = request.getCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = SystemInfoService.DEFAULT_CLUSTER;
+ }
+
+ LOG.info("get snapshot info, user: {}, db: {}, label_name: {}, snapshot_name: {}, snapshot_type: {}",
+ request.getUser(), request.getDb(), request.getLabelName(), request.getSnapshotName(),
+ request.getSnapshotType());
+ if (Strings.isNullOrEmpty(request.getToken())) {
+ checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+ request.getTable(), clientIp, PrivPredicate.LOAD);
+ }
+
+ // Step 3: get snapshot
+ TGetSnapshotResult result = new TGetSnapshotResult();
+ result.setStatus(new TStatus(TStatusCode.OK));
+ Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+ if (snapshot == null) {
+ result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
+ result.getStatus().addToErrorMsgs("snapshot not exist");
+ } else {
+ result.setMeta(snapshot.getMeta());
+ result.setJobInfo(snapshot.getJobInfo());
+ }
+
+ return result;
+ }
+
+ // Restore Snapshot
+ public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException {
+ String clientAddr = getClientAddrAsString();
+ LOG.trace("receive restore snapshot info request: {}", request);
+
+ TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ result = restoreSnapshotImpl(request, clientAddr);
+ } catch (UserException e) {
+ LOG.warn("failed to get snapshot info: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
+ }
+
+ return result;
+ }
+
+ // restoreSnapshotImpl
+ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp)
+ throws UserException {
+ // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info
+ if (!request.isSetUser()) {
+ throw new UserException("user is not set");
+ }
+ if (!request.isSetPasswd()) {
+ throw new UserException("passwd is not set");
+ }
+ if (!request.isSetDb()) {
+ throw new UserException("db is not set");
+ }
+ if (!request.isSetLabelName()) {
+ throw new UserException("label_name is not set");
+ }
+ if (!request.isSetRepoName()) {
+ throw new UserException("repo_name is not set");
+ }
+ if (!request.isSetMeta()) {
+ throw new UserException("meta is not set");
+ }
+ if (!request.isSetJobInfo()) {
+ throw new UserException("job_info is not set");
+ }
+
+ // Step 2: check auth
+ String cluster = request.getCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = SystemInfoService.DEFAULT_CLUSTER;
+ }
+
+ if (Strings.isNullOrEmpty(request.getToken())) {
+ checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+ request.getTable(), clientIp, PrivPredicate.LOAD);
+ }
+
+ // Step 3: get snapshot
+ TRestoreSnapshotResult result = new TRestoreSnapshotResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+
+
+ LabelName label = new LabelName(request.getDb(), request.getLabelName());
+ String repoName = request.getRepoName();
+ Map<String, String> properties = request.getProperties();
+ RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(),
+ request.getJobInfo());
+ LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
+ try {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx == null) {
+ ctx = new ConnectContext();
+ ctx.setThreadLocalInfo();
+ }
+ ctx.setCluster(cluster);
+ ctx.setQualifiedUser(request.getUser());
+ UserIdentity currentUserIdentity = new UserIdentity(request.getUser(), "%");
+ ctx.setCurrentUserIdentity(currentUserIdentity);
+
+ Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
+ restoreStmt.analyze(analyzer);
+ DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
+ } catch (UserException e) {
+ LOG.warn("failed to get snapshot info: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ }
+
+ return result;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
index 64b75a70d3..6482c5f807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DownloadTask.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.thrift.TDownloadReq;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRemoteTabletSnapshot;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
+import java.util.List;
import java.util.Map;
public class DownloadTask extends AgentTask {
@@ -34,6 +36,9 @@ public class DownloadTask extends AgentTask {
private Map<String, String> brokerProperties;
private StorageBackend.StorageType storageType;
private String location;
+ private List<TRemoteTabletSnapshot> remoteTabletSnapshots;
+ private boolean isFromLocalSnapshot = false;
+
public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
Map<String, String> srcToDestPath, FsBroker brokerAddr, Map<String, String> brokerProperties,
@@ -45,6 +50,16 @@ public class DownloadTask extends AgentTask {
this.brokerProperties = brokerProperties;
this.storageType = storageType;
this.location = location;
+ this.isFromLocalSnapshot = false;
+ }
+
+ public DownloadTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId, long dbId,
+ List<TRemoteTabletSnapshot> remoteTabletSnapshots) {
+ super(resourceInfo, backendId, TTaskType.DOWNLOAD, dbId, -1, -1, -1, -1, signature);
+ this.jobId = jobId;
+ this.srcToDestPath = new java.util.HashMap<String, String>();
+ this.remoteTabletSnapshots = remoteTabletSnapshots;
+ this.isFromLocalSnapshot = true;
}
public long getJobId() {
@@ -64,11 +79,22 @@ public class DownloadTask extends AgentTask {
}
public TDownloadReq toThrift() {
- TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
- TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address);
- req.setBrokerProp(brokerProperties);
- req.setStorageBackend(storageType.toThrift());
- req.setLocation(location);
+ // these fields are required
+ // 1: required i64 job_id
+ // 2: required map<string, string> src_dest_map
+ // 3: required Types.TNetworkAddress broker_addr
+ TDownloadReq req;
+ if (isFromLocalSnapshot) {
+ TNetworkAddress brokerAddr = new TNetworkAddress("", 0); // mock broker address
+ req = new TDownloadReq(jobId, srcToDestPath, brokerAddr);
+ req.setRemoteTabletSnapshots(remoteTabletSnapshots);
+ } else {
+ TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
+ req = new TDownloadReq(jobId, srcToDestPath, address);
+ req.setBrokerProp(brokerProperties);
+ req.setStorageBackend(storageType.toThrift());
+ req.setLocation(location);
+ }
return req;
}
}
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 6b74944cd7..b30c7ed26a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -288,6 +288,16 @@ struct TUploadReq {
6: optional string location // root path
}
+struct TRemoteTabletSnapshot {
+ 1: optional i64 local_tablet_id
+ 2: optional string local_snapshot_path
+ 3: optional i64 remote_tablet_id
+ 4: optional i64 remote_be_id
+ 5: optional Types.TNetworkAddress remote_be_addr
+ 6: optional string remote_snapshot_path
+ 7: optional string remote_token
+}
+
struct TDownloadReq {
1: required i64 job_id
2: required map<string, string> src_dest_map
@@ -295,6 +305,7 @@ struct TDownloadReq {
4: optional map<string, string> broker_prop
5: optional Types.TStorageBackendType storage_backend = Types.TStorageBackendType.BROKER
6: optional string location // root path
+ 7: optional list<TRemoteTabletSnapshot> remote_tablet_snapshots
}
struct TSnapshotRequest {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 5aecdc4ba2..433b137a37 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -974,6 +974,52 @@ struct TGetTabletReplicaInfosResult {
3: optional string token
}
+enum TSnapshotType {
+ REMOTE = 0,
+ LOCAL = 1,
+}
+
+struct TGetSnapshotRequest {
+ 1: optional string cluster
+ 2: optional string user
+ 3: optional string passwd
+ 4: optional string db
+ 5: optional string table
+ 6: optional string token
+ 7: optional string label_name
+ 8: optional string snapshot_name
+ 9: optional TSnapshotType snapshot_type
+}
+
+struct TGetSnapshotResult {
+ 1: optional Status.TStatus status
+ 2: optional binary meta
+ 3: optional binary job_info
+}
+
+struct TTableRef {
+ 1: optional string table
+}
+
+struct TRestoreSnapshotRequest {
+ 1: optional string cluster
+ 2: optional string user
+ 3: optional string passwd
+ 4: optional string db
+ 5: optional string table
+ 6: optional string token
+ 7: optional string label_name
+ 8: optional string repo_name
+ 9: optional list<TTableRef> table_refs
+ 10: optional map<string, string> properties
+ 11: optional binary meta
+ 12: optional binary job_info
+}
+
+struct TRestoreSnapshotResult {
+ 1: optional Status.TStatus status
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1006,6 +1052,8 @@ service FrontendService {
TCommitTxnResult commitTxn(1: TCommitTxnRequest request)
TRollbackTxnResult rollbackTxn(1: TRollbackTxnRequest request)
TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
+ TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
+ TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)
diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift
index 7edaecf7fb..0342f9bce0 100644
--- a/gensrc/thrift/Status.thrift
+++ b/gensrc/thrift/Status.thrift
@@ -91,6 +91,9 @@ enum TStatusCode {
BINLOG_TOO_NEW_COMMIT_SEQ = 62,
BINLOG_NOT_FOUND_DB = 63,
BINLOG_NOT_FOUND_TABLE = 64,
+
+ // Snapshot Related from 70
+ SNAPSHOT_NOT_EXIST = 70,
}
struct TStatus {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org