You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/06/18 01:56:14 UTC
[incubator-doris] branch master updated: [Memory Engine] MemTablet
creation and compatibility handling in BE (#3762)
This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca96ea3 [Memory Engine] MemTablet creation and compatibility handling in BE (#3762)
ca96ea3 is described below
commit ca96ea30560c9e9837c28cfd2cdd8ed24196f787
Author: Binglin Chang <de...@gmail.com>
AuthorDate: Thu Jun 18 09:56:07 2020 +0800
[Memory Engine] MemTablet creation and compatibility handling in BE (#3762)
---
be/src/agent/task_worker_pool.cpp | 13 +-
be/src/http/action/meta_action.cpp | 5 +-
be/src/olap/base_tablet.cpp | 30 +++-
be/src/olap/base_tablet.h | 138 +++++++++++++++
be/src/olap/data_dir.cpp | 4 +-
be/src/olap/data_dir.h | 5 +-
be/src/olap/memory/mem_tablet.cpp | 34 +++-
be/src/olap/memory/mem_tablet.h | 19 ++-
be/src/olap/storage_engine.cpp | 12 +-
be/src/olap/tablet.cpp | 25 +--
be/src/olap/tablet.h | 146 ++--------------
be/src/olap/tablet_manager.cpp | 286 +++++++++++++++++++++++---------
be/src/olap/tablet_manager.h | 30 ++--
be/src/olap/tablet_meta.cpp | 2 +-
be/test/olap/memory/mem_tablet_test.cpp | 2 +-
gensrc/thrift/MasterService.thrift | 1 +
16 files changed, 476 insertions(+), 276 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 42f178a..d54b090 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -344,8 +344,9 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) {
} else {
++_s_report_version;
// get path hash of the created tablet
- TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
- create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash);
+ BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->
+ get_base_tablet(create_tablet_req.tablet_id,
+ create_tablet_req.tablet_schema.schema_hash);
DCHECK(tablet != nullptr);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->table_id();
@@ -399,8 +400,8 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) {
TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;
- TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
- drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
+ BaseTabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->
+ get_base_tablet(drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
if (dropped_tablet != nullptr) {
OLAPStatus drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
@@ -827,8 +828,8 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this)
TStatus task_status;
for (auto tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
- TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
- tablet_meta_info.tablet_id, tablet_meta_info.schema_hash);
+ BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->
+ get_base_tablet(tablet_meta_info.tablet_id, tablet_meta_info.schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "could not find tablet when update partition id"
<< " tablet_id=" << tablet_meta_info.tablet_id
diff --git a/be/src/http/action/meta_action.cpp b/be/src/http/action/meta_action.cpp
index 2fad2a9..90c90a0 100644
--- a/be/src/http/action/meta_action.cpp
+++ b/be/src/http/action/meta_action.cpp
@@ -53,9 +53,8 @@ Status MetaAction::_handle_header(HttpRequest* req, std::string* json_meta) {
<< ", schema_hash:" << req_schema_hash;
return Status::InternalError(strings::Substitute("convert failed, $0", e.what()));
}
-
- TabletSharedPtr tablet =
- StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
+ BaseTabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_base_tablet(
+ tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash;
return Status::InternalError("no tablet exist");
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index d544f36..7346715 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -26,12 +26,31 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
- _data_dir(data_dir) {
+ _data_dir(data_dir),
+ _is_bad(false) {
_gen_tablet_path();
}
BaseTablet::~BaseTablet() {}
+
+OLAPStatus BaseTablet::init() {
+ return _init_once.call([this] { return _init_once_action(); });
+}
+
+// should save tablet meta to remote meta store
+// if it's a primary replica
+void BaseTablet::save_meta() {
+ auto res = _tablet_meta->save_meta(_data_dir);
+ CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res
+ << ", root=" << _data_dir->path();
+ // User could directly update tablet schema by _tablet_meta,
+ // So we need to refetch schema again
+ _schema = _tablet_meta->tablet_schema();
+ // TODO: update _mem_schema too?
+}
+
+
OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) {
LOG(WARNING) << "could not change tablet state from shutdown to " << state;
@@ -52,4 +71,13 @@ void BaseTablet::_gen_tablet_path() {
}
}
+OLAPStatus BaseTablet::set_partition_id(int64_t partition_id) {
+ return _tablet_meta->set_partition_id(partition_id);
+}
+
+TabletInfo BaseTablet::get_tablet_info() const {
+ return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
+}
+
+
} /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index f3b0c2d..f3ebfb0 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -20,13 +20,19 @@
#include <memory>
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/MasterService_types.h"
+#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/utils.h"
+#include "util/once.h"
namespace doris {
class DataDir;
+class BaseTablet;
+using BaseTabletSharedPtr = std::shared_ptr<BaseTablet>;
// Base class for all tablet classes, currently only olap/Tablet and
// olap/memory/MemTablet.
@@ -60,11 +66,54 @@ public:
inline void set_creation_time(int64_t creation_time);
inline bool equal(int64_t tablet_id, int32_t schema_hash);
+ OLAPStatus init();
+ inline bool init_succeeded();
+
+ bool is_used();
+
+ void save_meta();
+
+ void register_tablet_into_dir();
+ void deregister_tablet_from_dir();
+
+
// properties encapsulated in TabletSchema
inline const TabletSchema& tablet_schema() const;
+ inline size_t tablet_footprint(); // disk space occupied by tablet
+ inline size_t num_rows();
+ inline int version_count() const;
+ inline Version max_version() const;
+
+ // propreties encapsulated in TabletSchema
+ inline KeysType keys_type() const;
+ inline size_t num_columns() const;
+ inline size_t num_null_columns() const;
+ inline size_t num_key_columns() const;
+ inline size_t num_short_key_columns() const;
+ inline size_t num_rows_per_row_block() const;
+ inline CompressKind compress_kind() const;
+ inline double bloom_filter_fpp() const;
+ inline size_t next_unique_id() const;
+ inline size_t row_size() const;
+ inline size_t field_index(const string& field_name) const;
+
+ OLAPStatus set_partition_id(int64_t partition_id);
+
+ TabletInfo get_tablet_info() const;
+
+ // meta lock
+ inline void obtain_header_rdlock() { _meta_lock.rdlock(); }
+ inline void obtain_header_wrlock() { _meta_lock.wrlock(); }
+ inline void release_header_lock() { _meta_lock.unlock(); }
+ inline RWMutex* get_header_lock_ptr() { return &_meta_lock; }
+
+ virtual void build_tablet_report_info(TTabletInfo* tablet_info) = 0;
+
+ virtual void delete_all_files() = 0;
protected:
void _gen_tablet_path();
+ virtual OLAPStatus _init_once_action() = 0;
protected:
TabletState _state;
@@ -74,6 +123,13 @@ protected:
DataDir* _data_dir;
std::string _tablet_path;
+ DorisCallOnce<OLAPStatus> _init_once;
+ // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
+ // explain how these two locks work together.
+ mutable RWMutex _meta_lock;
+ // if this tablet is broken, set to true. default is false
+ std::atomic<bool> _is_bad;
+
private:
DISALLOW_COPY_AND_ASSIGN(BaseTablet);
};
@@ -141,6 +197,88 @@ inline const TabletSchema& BaseTablet::tablet_schema() const {
return _schema;
}
+inline bool BaseTablet::init_succeeded() {
+ return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS;
+}
+
+inline bool BaseTablet::is_used() {
+ return !_is_bad && _data_dir->is_used();
+}
+
+inline void BaseTablet::register_tablet_into_dir() {
+ _data_dir->register_tablet(this);
+}
+
+inline void BaseTablet::deregister_tablet_from_dir() {
+ _data_dir->deregister_tablet(this);
+}
+
+// TODO(lingbin): Why other methods that need to get information from _tablet_meta
+// are not locked, here needs a comment to explain.
+inline size_t BaseTablet::tablet_footprint() {
+ ReadLock rdlock(&_meta_lock);
+ return _tablet_meta->tablet_footprint();
+}
+
+// TODO(lingbin): Why other methods which need to get information from _tablet_meta
+// are not locked, here needs a comment to explain.
+inline size_t BaseTablet::num_rows() {
+ ReadLock rdlock(&_meta_lock);
+ return _tablet_meta->num_rows();
+}
+
+inline int BaseTablet::version_count() const {
+ return _tablet_meta->version_count();
+}
+
+inline Version BaseTablet::max_version() const {
+ return _tablet_meta->max_version();
+}
+
+inline KeysType BaseTablet::keys_type() const {
+ return _schema.keys_type();
+}
+
+inline size_t BaseTablet::num_columns() const {
+ return _schema.num_columns();
+}
+
+inline size_t BaseTablet::num_null_columns() const {
+ return _schema.num_null_columns();
+}
+
+inline size_t BaseTablet::num_key_columns() const {
+ return _schema.num_key_columns();
+}
+
+inline size_t BaseTablet::num_short_key_columns() const {
+ return _schema.num_short_key_columns();
+}
+
+inline size_t BaseTablet::num_rows_per_row_block() const {
+ return _schema.num_rows_per_row_block();
+}
+
+inline CompressKind BaseTablet::compress_kind() const {
+ return _schema.compress_kind();
+}
+
+inline double BaseTablet::bloom_filter_fpp() const {
+ return _schema.bloom_filter_fpp();
+}
+
+inline size_t BaseTablet::next_unique_id() const {
+ return _schema.next_column_unique_id();
+}
+
+inline size_t BaseTablet::field_index(const string& field_name) const {
+ return _schema.field_index(field_name);
+}
+
+inline size_t BaseTablet::row_size() const {
+ return _schema.row_size();
+}
+
} /* namespace doris */
#endif /* DORIS_BE_SRC_OLAP_BASE_TABLET_H */
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index f577f97..8cabfd9 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -334,14 +334,14 @@ OLAPStatus DataDir::get_shard(uint64_t* shard) {
return OLAP_SUCCESS;
}
-void DataDir::register_tablet(Tablet* tablet) {
+void DataDir::register_tablet(BaseTablet* tablet) {
TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid());
std::lock_guard<std::mutex> l(_mutex);
_tablet_set.emplace(std::move(tablet_info));
}
-void DataDir::deregister_tablet(Tablet* tablet) {
+void DataDir::deregister_tablet(BaseTablet* tablet) {
TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid());
std::lock_guard<std::mutex> l(_mutex);
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 58992ed..f8df04a 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -32,6 +32,7 @@
namespace doris {
+class BaseTablet;
class Tablet;
class TabletManager;
class TabletMeta;
@@ -80,8 +81,8 @@ public:
TStorageMedium::type storage_medium() const { return _storage_medium; }
- void register_tablet(Tablet* tablet);
- void deregister_tablet(Tablet* tablet);
+ void register_tablet(BaseTablet* tablet);
+ void deregister_tablet(BaseTablet* tablet);
void clear_tablets(std::vector<TabletInfo>* tablet_infos);
std::string get_absolute_shard_path(int64_t shard_id);
diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/mem_tablet.cpp
index 03a9d45..0e5f33b 100644
--- a/be/src/olap/memory/mem_tablet.cpp
+++ b/be/src/olap/memory/mem_tablet.cpp
@@ -36,11 +36,18 @@ std::shared_ptr<MemTablet> MemTablet::create_tablet_from_meta(TabletMetaSharedPt
return std::make_shared<MemTablet>(tablet_meta, data_dir);
}
-Status MemTablet::init() {
+OLAPStatus MemTablet::_init_once_action() {
_max_version = 0;
- return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
+ Status ret = MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
+ if (ret.ok()) {
+ return OLAP_SUCCESS;
+ } else {
+ // TODO: Status/OLAPStatus compatibility
+ return OLAP_ERR_INIT_FAILED;
+ }
}
+
Status MemTablet::scan(std::unique_ptr<ScanSpec>* spec, std::unique_ptr<MemTabletScan>* scan) {
uint64_t version = (*spec)->version();
if (version == UINT64_MAX) {
@@ -86,5 +93,28 @@ Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) {
return Status::OK();
}
+void MemTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
+ ReadLock rdlock(&_meta_lock);
+ tablet_info->tablet_id = _tablet_meta->tablet_id();
+ tablet_info->schema_hash = _tablet_meta->schema_hash();
+ tablet_info->row_count = _tablet_meta->num_rows();
+ tablet_info->data_size = _tablet_meta->tablet_footprint();
+ tablet_info->version = _max_version;
+ tablet_info->version_hash = 0;
+ tablet_info->__set_partition_id(_tablet_meta->partition_id());
+ tablet_info->__set_storage_medium(_data_dir->storage_medium());
+ tablet_info->__set_version_count(_tablet_meta->version_count());
+ tablet_info->__set_path_hash(_data_dir->path_hash());
+ tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
+ tablet_info->__set_tablet_type(_tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK ?
+ TTabletType::TABLET_TYPE_DISK : TTabletType::TABLET_TYPE_MEMORY);
+}
+
+void MemTablet::delete_all_files() {
+ // TODO:
+}
+
+
+
} // namespace memory
} // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h
index dfafa27..03c3933 100644
--- a/be/src/olap/memory/mem_tablet.h
+++ b/be/src/olap/memory/mem_tablet.h
@@ -27,6 +27,15 @@ class MemSubTablet;
class ScanSpec;
class MemTabletScan;
class WriteTxn;
+class MemTablet;
+using MemTabletSharedPtr = std::shared_ptr<MemTablet>;
+
+inline MemTabletSharedPtr to_mem_tablet(const BaseTabletSharedPtr& base) {
+ if (base->is_memory()) {
+ return std::static_pointer_cast<MemTablet>(base);
+ }
+ return MemTabletSharedPtr();
+}
// Tablet class for memory-optimized storage engine.
//
@@ -49,9 +58,6 @@ public:
virtual ~MemTablet();
- // Initialize
- Status init();
-
// Scan the tablet, return a MemTabletScan object scan, user can specify projections
// using ScanSpec, currently only support full scan with projection, will support
// filter/aggregation in the future.
@@ -70,6 +76,13 @@ public:
// Note: commit is done sequentially, protected by internal write lock
Status commit_write_txn(WriteTxn* wtxn, uint64_t version);
+ virtual void build_tablet_report_info(TTabletInfo* tablet_info);
+
+ virtual void delete_all_files();
+
+protected:
+ virtual OLAPStatus _init_once_action();
+
private:
friend class MemTabletScan;
// memory::Schema is used internally rather than TabletSchema, so we need an extra
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 61d7ab9..165b4c2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -907,9 +907,9 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
vector<TabletInfo> tablet_infos;
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
- vector<TabletSharedPtr> related_tablets;
+ vector<BaseTabletSharedPtr> related_tablets;
for (TabletInfo& tablet_info : tablet_infos) {
- TabletSharedPtr tablet = _tablet_manager->get_tablet(
+ BaseTabletSharedPtr tablet = _tablet_manager->get_base_tablet(
tablet_info.tablet_id, tablet_info.schema_hash);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
@@ -921,7 +921,7 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
}
// add write lock to all related tablets
OLAPStatus prepare_status = task->prepare();
- for (TabletSharedPtr& tablet : related_tablets) {
+ for (auto& tablet : related_tablets) {
tablet->release_header_lock();
}
if (prepare_status != OLAP_SUCCESS) {
@@ -943,9 +943,9 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
// related tablets may be changed after execute task, so that get them here again
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
- vector<TabletSharedPtr> related_tablets;
+ vector<BaseTabletSharedPtr> related_tablets;
for (TabletInfo& tablet_info : tablet_infos) {
- TabletSharedPtr tablet = _tablet_manager->get_tablet(
+ auto tablet = _tablet_manager->get_base_tablet(
tablet_info.tablet_id, tablet_info.schema_hash);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
@@ -957,7 +957,7 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
}
// add write lock to all related tablets
OLAPStatus fin_status = task->finish();
- for (TabletSharedPtr& tablet : related_tablets) {
+ for (auto& tablet : related_tablets) {
tablet->release_header_lock();
}
return fin_status;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index e48ca02..429c37a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -55,7 +55,6 @@ TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
BaseTablet(tablet_meta, data_dir),
- _is_bad(false),
_last_cumu_compaction_failure_millis(0),
_last_base_compaction_failure_millis(0),
_last_cumu_compaction_success_millis(0),
@@ -102,20 +101,6 @@ OLAPStatus Tablet::_init_once_action() {
return res;
}
-OLAPStatus Tablet::init() {
- return _init_once.call([this] { return _init_once_action(); });
-}
-
-// should save tablet meta to remote meta store
-// if it's a primary replica
-void Tablet::save_meta() {
- auto res = _tablet_meta->save_meta(_data_dir);
- CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path();
- // User could directly update tablet schema by _tablet_meta,
- // So we need to refetch schema again
- _schema = _tablet_meta->tablet_schema();
-}
-
OLAPStatus Tablet::revise_tablet_meta(
const vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const vector<Version>& versions_to_delete) {
@@ -844,14 +829,6 @@ OLAPStatus Tablet::_contains_version(const Version& version) {
return OLAP_SUCCESS;
}
-OLAPStatus Tablet::set_partition_id(int64_t partition_id) {
- return _tablet_meta->set_partition_id(partition_id);
-}
-
-TabletInfo Tablet::get_tablet_info() const {
- return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
-}
-
void Tablet::pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec,
std::vector<RowsetSharedPtr>* candidate_rowsets) {
int64_t now = UnixSeconds();
@@ -1045,6 +1022,8 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
+ tablet_info->__set_tablet_type(_tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK ?
+ TTabletType::TABLET_TYPE_DISK : TTabletType::TABLET_TYPE_MEMORY);
}
// should use this method to get a copy of current tablet meta
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 40cac13..e7bd244 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -46,6 +46,13 @@ class TabletMeta;
using TabletSharedPtr = std::shared_ptr<Tablet>;
+inline TabletSharedPtr to_tablet(const BaseTabletSharedPtr& base) {
+ if (base->is_memory()) {
+ return TabletSharedPtr();
+ }
+ return std::static_pointer_cast<Tablet>(base);
+}
+
class Tablet : public BaseTablet {
public:
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
@@ -53,15 +60,6 @@ public:
Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
- OLAPStatus init();
- inline bool init_succeeded();
-
- bool is_used();
-
- void register_tablet_into_dir();
- void deregister_tablet_from_dir();
-
- void save_meta();
// Used in clone task, to update local meta when finishing a clone job
OLAPStatus revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
@@ -69,24 +67,6 @@ public:
inline const int64_t cumulative_layer_point() const;
inline void set_cumulative_layer_point(int64_t new_point);
- inline size_t tablet_footprint(); // disk space occupied by tablet
- inline size_t num_rows();
- inline int version_count() const;
- inline Version max_version() const;
-
- // propreties encapsulated in TabletSchema
- inline KeysType keys_type() const;
- inline size_t num_columns() const;
- inline size_t num_null_columns() const;
- inline size_t num_key_columns() const;
- inline size_t num_short_key_columns() const;
- inline size_t num_rows_per_row_block() const;
- inline CompressKind compress_kind() const;
- inline double bloom_filter_fpp() const;
- inline size_t next_unique_id() const;
- inline size_t row_size() const;
- inline size_t field_index(const string& field_name) const;
-
// operation in rowsets
OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
void modify_rowsets(const vector<RowsetSharedPtr>& to_add,
@@ -128,12 +108,6 @@ public:
void delete_alter_task();
OLAPStatus set_alter_state(AlterTabletState state);
- // meta lock
- inline void obtain_header_rdlock() { _meta_lock.rdlock(); }
- inline void obtain_header_wrlock() { _meta_lock.wrlock(); }
- inline void release_header_lock() { _meta_lock.unlock(); }
- inline RWMutex* get_header_lock_ptr() { return &_meta_lock; }
-
// ingest lock
inline void obtain_push_lock() { _ingest_lock.lock(); }
inline void release_push_lock() { _ingest_lock.unlock(); }
@@ -193,15 +167,9 @@ public:
int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
void set_last_base_compaction_success_time(int64_t millis) { _last_base_compaction_success_millis = millis; }
- void delete_all_files();
-
bool check_path(const std::string& check_path) const;
bool check_rowset_id(const RowsetId& rowset_id);
- OLAPStatus set_partition_id(int64_t partition_id);
-
- TabletInfo get_tablet_info() const;
-
void pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec,
std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
@@ -219,20 +187,24 @@ public:
bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
- void build_tablet_report_info(TTabletInfo* tablet_info);
-
void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const;
// return a json string to show the compaction status of this tablet
void get_compaction_status(std::string* json_result);
+ virtual void build_tablet_report_info(TTabletInfo* tablet_info);
+
+ virtual void delete_all_files();
+
+protected:
+ virtual OLAPStatus _init_once_action();
+
private:
- OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
bool _contains_rowset(const RowsetId rowset_id);
OLAPStatus _contains_version(const Version& version);
void _max_continuous_version_from_begining_unlocked(Version* version,
- VersionHash* v_hash) const ;
+ VersionHash* v_hash) const;
RowsetSharedPtr _rowset_with_largest_size();
void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash);
OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& version_path,
@@ -243,7 +215,6 @@ private:
RowsetGraph _rs_graph;
- DorisCallOnce<OLAPStatus> _init_once;
// meta store lock is used for prevent 2 threads do checkpoint concurrently
// it will be used in econ-mode in the future
RWMutex _meta_store_lock;
@@ -252,9 +223,6 @@ private:
Mutex _cumulative_lock;
RWMutex _migration_lock;
- // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
- // explain how these two locks work together.
- mutable RWMutex _meta_lock;
// A new load job will produce a new rowset, which will be inserted into both _rs_version_map
// and _inc_rs_version_map. Only the most recent rowsets are kept in _inc_rs_version_map to
// reduce the amount of data that needs to be copied during the clone task.
@@ -268,8 +236,6 @@ private:
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map;
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _inc_rs_version_map;
- // if this tablet is broken, set to true. default is false
- std::atomic<bool> _is_bad;
// timestamp of last cumu compaction failure
std::atomic<int64_t> _last_cumu_compaction_failure_millis;
// timestamp of last base compaction failure
@@ -285,22 +251,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(Tablet);
};
-inline bool Tablet::init_succeeded() {
- return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS;
-}
-
-inline bool Tablet::is_used() {
- return !_is_bad && _data_dir->is_used();
-}
-
-inline void Tablet::register_tablet_into_dir() {
- _data_dir->register_tablet(this);
-}
-
-inline void Tablet::deregister_tablet_from_dir() {
- _data_dir->deregister_tablet(this);
-}
-
inline const int64_t Tablet::cumulative_layer_point() const {
return _cumulative_point;
@@ -311,72 +261,6 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
}
-// TODO(lingbin): Why other methods that need to get information from _tablet_meta
-// are not locked, here needs a comment to explain.
-inline size_t Tablet::tablet_footprint() {
- ReadLock rdlock(&_meta_lock);
- return _tablet_meta->tablet_footprint();
-}
-
-// TODO(lingbin): Why other methods which need to get information from _tablet_meta
-// are not locked, here needs a comment to explain.
-inline size_t Tablet::num_rows() {
- ReadLock rdlock(&_meta_lock);
- return _tablet_meta->num_rows();
-}
-
-inline int Tablet::version_count() const {
- return _tablet_meta->version_count();
-}
-
-inline Version Tablet::max_version() const {
- return _tablet_meta->max_version();
-}
-
-inline KeysType Tablet::keys_type() const {
- return _schema.keys_type();
-}
-
-inline size_t Tablet::num_columns() const {
- return _schema.num_columns();
-}
-
-inline size_t Tablet::num_null_columns() const {
- return _schema.num_null_columns();
-}
-
-inline size_t Tablet::num_key_columns() const {
- return _schema.num_key_columns();
-}
-
-inline size_t Tablet::num_short_key_columns() const {
- return _schema.num_short_key_columns();
-}
-
-inline size_t Tablet::num_rows_per_row_block() const {
- return _schema.num_rows_per_row_block();
-}
-
-inline CompressKind Tablet::compress_kind() const {
- return _schema.compress_kind();
-}
-
-inline double Tablet::bloom_filter_fpp() const {
- return _schema.bloom_filter_fpp();
-}
-
-inline size_t Tablet::next_unique_id() const {
- return _schema.next_column_unique_id();
-}
-
-inline size_t Tablet::field_index(const string& field_name) const {
- return _schema.field_index(field_name);
-}
-
-inline size_t Tablet::row_size() const {
- return _schema.row_size();
-}
-
}
#endif // DORIS_BE_SRC_OLAP_TABLET_H
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 3c3fbe5..aafd7d9 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -41,6 +41,7 @@
#include "olap/rowset/column_data_writer.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
+#include "olap/memory/mem_tablet.h"
#include "olap/schema_change.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -62,7 +63,7 @@ using strings::Substitute;
namespace doris {
-static bool _cmp_tablet_by_create_time(const TabletSharedPtr& a, const TabletSharedPtr& b) {
+static bool _cmp_tablet_by_create_time(const BaseTabletSharedPtr& a, const BaseTabletSharedPtr& b) {
return a->creation_time() < b->creation_time();
}
@@ -81,15 +82,15 @@ TabletManager::~TabletManager() {
}
OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
- const TabletSharedPtr& tablet,
+ const BaseTabletSharedPtr& base_tablet,
bool update_meta, bool force) {
OLAPStatus res = OLAP_SUCCESS;
VLOG(3) << "begin to add tablet to TabletManager. " << "tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash << ", force=" << force;
- TabletSharedPtr existed_tablet = nullptr;
+ BaseTabletSharedPtr existed_tablet = nullptr;
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
- for (TabletSharedPtr item : tablet_map[tablet_id].table_arr) {
+ for (auto& item : tablet_map[tablet_id].table_arr) {
if (item->equal(tablet_id, schema_hash)) {
existed_tablet = item;
break;
@@ -98,26 +99,34 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s
if (existed_tablet == nullptr) {
return _add_tablet_to_map_unlocked(tablet_id, schema_hash,
- tablet, update_meta,
+ base_tablet, update_meta,
false /*keep_files*/, false /*drop_old*/);
}
if (!force) {
- if (existed_tablet->tablet_path() == tablet->tablet_path()) {
+ if (existed_tablet->tablet_path() == base_tablet->tablet_path()) {
LOG(WARNING) << "add the same tablet twice! tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash
- << ", tablet_path=" << tablet->tablet_path();
+ << ", tablet_path=" << base_tablet->tablet_path();
return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
}
- if (existed_tablet->data_dir() == tablet->data_dir()) {
+ if (existed_tablet->data_dir() == base_tablet->data_dir()) {
LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash;
return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
}
}
+ if (base_tablet->is_memory() || existed_tablet->is_memory()) {
+ LOG(WARNING) << "add the same MemTablet twice! tablet_id=" << tablet_id
+ << ", schema_hash=" << schema_hash
+ << ", tablet_path=" << base_tablet->tablet_path();
+ return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
+ }
+
+ TabletSharedPtr tablet = to_tablet(base_tablet);
existed_tablet->obtain_header_rdlock();
- const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version();
+ const RowsetSharedPtr old_rowset = to_tablet(existed_tablet)->rowset_with_max_version();
const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
// If new tablet is empty, it is a newly created schema change tablet.
@@ -163,7 +172,7 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s
}
OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
- const TabletSharedPtr& tablet,
+ const BaseTabletSharedPtr& tablet,
bool update_meta, bool keep_files,
bool drop_old) {
// check if new tablet's meta is in store and add new tablet's meta to meta store
@@ -224,7 +233,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
// tablet_id exist but with different schema_hash, return an error(report task will
// eventually trigger its deletion).
if (_check_tablet_id_exist_unlocked(tablet_id)) {
- TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, schema_hash);
+ BaseTabletSharedPtr tablet = _get_base_tablet_unlocked(tablet_id, schema_hash);
if (tablet != nullptr) {
LOG(INFO) << "success to create tablet. tablet already exist. tablet_id=" << tablet_id;
return OLAP_SUCCESS;
@@ -241,6 +250,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
// If the CreateTabletReq has base_tablet_id then it is a alter-tablet request
if (request.__isset.base_tablet_id && request.base_tablet_id > 0) {
is_schema_change = true;
+ // MemTablet does not support schema change, so it's safe to use TabletSharedPtr
base_tablet = _get_tablet_unlocked(request.base_tablet_id, request.base_schema_hash);
if (base_tablet == nullptr) {
LOG(WARNING) << "fail to create tablet(change schema), base tablet does not exist. "
@@ -258,7 +268,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
}
// set alter type to schema-change. it is useless
- TabletSharedPtr tablet = _internal_create_tablet_unlocked(
+ auto tablet = _internal_create_tablet_unlocked(
AlterTabletType::SCHEMA_CHANGE, request, is_schema_change, base_tablet.get(), stores);
if (tablet == nullptr) {
LOG(WARNING) << "fail to create tablet. tablet_id=" << request.tablet_id;
@@ -271,7 +281,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
return OLAP_SUCCESS;
}
-TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
+BaseTabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
const AlterTabletType alter_type, const TCreateTabletReq& request,
const bool is_schema_change, const Tablet* base_tablet,
const std::vector<DataDir*>& data_dirs) {
@@ -312,17 +322,22 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
// 1. !is_schema_change: not in schema-change state;
// 2. request.base_tablet_id > 0: in schema-change state;
if (!is_schema_change || (request.__isset.base_tablet_id && request.base_tablet_id > 0)) {
- // Create init version if this is not a restore mode replica and request.version is set
- // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode;
- // if (!in_restore_mode && request.__isset.version) {
- // create inital rowset before add it to storage engine could omit many locks
- res = _create_inital_rowset_unlocked(request, tablet.get());
- if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to create initial version for tablet. res=" << res;
- break;
+ if (!tablet->is_memory()) {
+ // Create init version if this is not a restore mode replica and request.version is set
+ // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode;
+ // if (!in_restore_mode && request.__isset.version) {
+ // create inital rowset before add it to storage engine could omit many locks
+ res = _create_inital_rowset_unlocked(request, to_tablet(tablet).get());
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "fail to create initial version for tablet. res=" << res;
+ break;
+ }
}
}
if (is_schema_change) {
+ if (tablet->is_memory()) {
+ LOG(FATAL) << "MemTablet schema change not supported";
+ }
if (request.__isset.base_tablet_id && request.base_tablet_id > 0) {
LOG(INFO) << "request for alter-tablet v2, do not add alter task to tablet";
// if this is a new alter tablet, has to set its state to not ready
@@ -331,8 +346,10 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
tablet->set_tablet_state(TabletState::TABLET_NOTREADY);
} else {
// add alter task to new tablet if it is a new tablet during schema change
- tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(),
- vector<Version>(), alter_type);
+ to_tablet(tablet)->add_alter_task(base_tablet->tablet_id(),
+ base_tablet->schema_hash(),
+ vector<Version>(),
+ alter_type);
}
// 有可能出现以下2种特殊情况:
// 1. 因为操作系统时间跳变,导致新生成的表的creation_time小于旧表的creation_time时间
@@ -359,7 +376,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
// TODO(lingbin): The following logic seems useless, can be removed?
// Because if _add_tablet_unlocked() return OK, we must can get it from map.
- TabletSharedPtr tablet_ptr = _get_tablet_unlocked(new_tablet_id, new_schema_hash);
+ BaseTabletSharedPtr tablet_ptr = _get_base_tablet_unlocked(new_tablet_id, new_schema_hash);
if (tablet_ptr == nullptr) {
res = OLAP_ERR_TABLE_NOT_FOUND;
LOG(WARNING) << "fail to get tablet. res=" << res;
@@ -391,7 +408,7 @@ static string _gen_tablet_dir(const string& dir, int16_t shard_id, int64_t table
return path;
}
-TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
+BaseTabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
const TCreateTabletReq& request, const bool is_schema_change,
const Tablet* base_tablet, const std::vector<DataDir*>& data_dirs) {
string pending_id = StrCat(TABLET_ID_PREFIX, request.tablet_id);
@@ -427,16 +444,25 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
} else {
data_dir->add_pending_ids(pending_id);
Status st = FileUtils::create_dir(schema_hash_dir);
- if(!st.ok()) {
+ if (!st.ok()) {
LOG(WARNING) << "create dir fail. path=" << schema_hash_dir
<< " error=" << st.to_string();
continue;
}
}
- TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
- DCHECK(new_tablet != nullptr);
- return new_tablet;
+ TTabletType::type ttype = request.__isset.tablet_type ?
+ request.tablet_type : TTabletType::TABLET_TYPE_DISK;
+ if (ttype == TTabletType::TABLET_TYPE_DISK) {
+ TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
+ DCHECK(new_tablet != nullptr);
+ return std::static_pointer_cast<BaseTablet>(new_tablet);
+ } else {
+ memory::MemTabletSharedPtr new_tablet = memory::MemTablet::create_tablet_from_meta(
+ tablet_meta, data_dir);
+ DCHECK(new_tablet != nullptr);
+ return std::static_pointer_cast<BaseTablet>(new_tablet);
+ }
}
return nullptr;
}
@@ -462,13 +488,18 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(
DorisMetrics::instance()->drop_tablet_requests_total.increment(1);
// Fetch tablet which need to be droped
- TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id, schema_hash);
- if (to_drop_tablet == nullptr) {
+ BaseTabletSharedPtr to_drop_base_tablet = _get_base_tablet_unlocked(tablet_id, schema_hash);
+ if (to_drop_base_tablet == nullptr) {
LOG(WARNING) << "fail to drop tablet because it does not exist. "
<< "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash;
return OLAP_SUCCESS;
}
+ if (to_drop_base_tablet->is_memory()) {
+ return _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
+ }
+ TabletSharedPtr to_drop_tablet = to_tablet(to_drop_base_tablet);
+
// Try to get schema change info, we can drop tablet directly if it is not
// in schema-change state.
AlterTabletTaskSharedPtr alter_task = to_drop_tablet->alter_task();
@@ -562,7 +593,7 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path(
continue;
} else {
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
- for (list<TabletSharedPtr>::iterator it = tablet_map[tablet_id].table_arr.begin();
+ for (auto it = tablet_map[tablet_id].table_arr.begin();
it != tablet_map[tablet_id].table_arr.end();) {
if ((*it)->equal(tablet_id, schema_hash)) {
// We should first remove tablet from partition_map to avoid iterator
@@ -579,6 +610,13 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path(
return res;
}
+BaseTabletSharedPtr TabletManager::get_base_tablet(TTabletId tablet_id, SchemaHash schema_hash,
+ bool include_deleted, std::string* err) {
+ RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
+ ReadLock rlock(&tablet_map_lock);
+ return _get_base_tablet_unlocked(tablet_id, schema_hash, include_deleted, err);
+}
+
TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted, string* err) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
@@ -588,8 +626,24 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema
TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted, string* err) {
- TabletSharedPtr tablet;
- tablet = _get_tablet_unlocked(tablet_id, schema_hash);
+ BaseTabletSharedPtr ret = _get_base_tablet_unlocked(tablet_id, schema_hash, include_deleted,
+ err);
+ if (ret == nullptr) {
+ return TabletSharedPtr();
+ }
+ if (ret->is_memory()) {
+ LOG(FATAL) << "_get_tablet_unlocked get MemTablet";
+ return TabletSharedPtr();
+ }
+ return to_tablet(ret);
+}
+
+BaseTabletSharedPtr TabletManager::_get_base_tablet_unlocked(TTabletId tablet_id,
+ SchemaHash schema_hash,
+ bool include_deleted,
+ string* err) {
+ BaseTabletSharedPtr tablet;
+ tablet = _get_base_tablet_unlocked(tablet_id, schema_hash);
if (tablet == nullptr && include_deleted) {
ReadLock rlock(&_shutdown_tablets_lock);
for (auto& deleted_tablet : _shutdown_tablets) {
@@ -687,7 +741,12 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com
ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]);
tablet_map_t& tablet_map = _tablet_map_array[i];
for (tablet_map_t::value_type& table_ins : tablet_map){
- for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
+ for (BaseTabletSharedPtr& base_tablet_ptr : table_ins.second.table_arr) {
+ if (base_tablet_ptr->is_memory()) {
+ // TODO: mem_tablet doesn't do compaction yet
+ continue;
+ }
+ TabletSharedPtr tablet_ptr = to_tablet(base_tablet_ptr);
AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task();
if (cur_alter_task != nullptr
&& cur_alter_task->alter_state() != ALTER_FINISHED
@@ -771,7 +830,8 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com
}
OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_id,
- TSchemaHash schema_hash, const string& meta_binary, bool update_meta, bool force, bool restore) {
+ TSchemaHash schema_hash, const string& meta_binary,
+ bool update_meta, bool force, bool restore) {
RWMutex& tablet_map_lock = _get_tablet_map_lock(tablet_id);
WriteLock wlock(&tablet_map_lock);
TabletMetaSharedPtr tablet_meta(new TabletMeta());
@@ -805,37 +865,77 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
tablet_meta->set_tablet_state(TABLET_RUNNING);
}
- TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
- if (tablet == nullptr) {
- LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id
- << ", schema_hash:" << schema_hash;
- return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
- }
+ if (tablet_meta->tablet_type() == TabletTypePB::TABLET_TYPE_DISK) {
+ TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
+ if (tablet == nullptr) {
+ LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id
+ << ", schema_hash:" << schema_hash;
+ return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
+ }
+
+ if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) {
+ LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id
+ << " schema_hash=" << schema_hash << ", path=" << data_dir->path();
+ {
+ WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock);
+ _shutdown_tablets.push_back(tablet);
+ }
+ return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
+ }
+ // NOTE: We do not check tablet's initial version here, because if BE restarts when
+ // one tablet is doing schema-change, we may meet empty tablet.
+ if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) {
+ LOG(WARNING) << "fail to load tablet. it is in running state but without delta. "
+ << "tablet=" << tablet->full_name() << ", path=" << data_dir->path();
+ // tablet state is invalid, drop tablet
+ return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
+ }
- if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) {
- LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id
- << " schema_hash=" << schema_hash << ", path=" << data_dir->path();
- {
- WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock);
- _shutdown_tablets.push_back(tablet);
+ RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0",
+ tablet->full_name()));
+ RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash,
+ std::static_pointer_cast<BaseTablet>(tablet),
+ update_meta, force),
+ Substitute("fail to add tablet. tablet=$0", tablet->full_name()));
+
+ return OLAP_SUCCESS;
+ } else {
+ memory::MemTabletSharedPtr tablet = memory::MemTablet::create_tablet_from_meta(
+ tablet_meta, data_dir);
+ if (tablet == nullptr) {
+ LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id
+ << ", schema_hash:" << schema_hash;
+ return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
}
- return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
- }
- // NOTE: We do not check tablet's initial version here, because if BE restarts when
- // one tablet is doing schema-change, we may meet empty tablet.
- if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) {
- LOG(WARNING) << "fail to load tablet. it is in running state but without delta. "
- << "tablet=" << tablet->full_name() << ", path=" << data_dir->path();
- // tablet state is invalid, drop tablet
- return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
- }
- RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0",
- tablet->full_name()));
- RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash, tablet, update_meta, force),
- Substitute("fail to add tablet. tablet=$0", tablet->full_name()));
+ if (tablet_meta->tablet_state() == TABLET_SHUTDOWN) {
+ LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id
+ << " schema_hash=" << schema_hash << ", path=" << data_dir->path();
+ {
+ WriteLock shutdown_tablets_wlock(&_shutdown_tablets_lock);
+ _shutdown_tablets.push_back(tablet);
+ }
+ return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
+ }
+ // NOTE: We do not check tablet's initial version here, because if BE restarts when
+ // one tablet is doing schema-change, we may meet empty tablet.
+ if (tablet->max_version().first == -1 && tablet->tablet_state() == TABLET_RUNNING) {
+ LOG(WARNING) << "fail to load tablet. it is in running state but without delta. "
+ << "tablet=" << tablet->full_name() << ", path=" << data_dir->path();
+ // tablet state is invalid, drop tablet
+ return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
+ }
+
+ RETURN_NOT_OK_LOG(tablet->init(), Substitute("tablet init failed. tablet=$0",
+ tablet->full_name()));
+ RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, schema_hash,
+ std::static_pointer_cast<BaseTablet>(tablet),
+ update_meta, force),
+ Substitute("fail to add tablet. tablet=$0", tablet->full_name()));
+
+ return OLAP_SUCCESS;
+ }
- return OLAP_SUCCESS;
}
OLAPStatus TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
@@ -905,7 +1005,7 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) {
OLAPStatus res = OLAP_SUCCESS;
- TabletSharedPtr tablet = get_tablet(tablet_info->tablet_id, tablet_info->schema_hash);
+ auto tablet = get_base_tablet(tablet_info->tablet_id, tablet_info->schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "can't find tablet. " << " tablet=" << tablet_info->tablet_id
<< " schema_hash=" << tablet_info->schema_hash;
@@ -938,7 +1038,7 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
uint64_t tablet_id = item.first;
TTablet t_tablet;
- for (TabletSharedPtr tablet_ptr : item.second.table_arr) {
+ for (const auto& tablet_ptr : item.second.table_arr) {
TTabletInfo tablet_info;
tablet_ptr->build_tablet_report_info(&tablet_info);
@@ -964,7 +1064,8 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
OLAPStatus TabletManager::start_trash_sweep() {
{
std::vector<int64_t> tablets_to_clean;
- std::vector<TabletSharedPtr> all_tablets; // we use this vector to save all tablet ptr for saving lock time.
+ // we use this vector to save all tablet ptr for saving lock time.
+ std::vector<TabletSharedPtr> all_tablets;
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
tablet_map_t& tablet_map = _tablet_map_array[i];
{
@@ -974,8 +1075,11 @@ OLAPStatus TabletManager::start_trash_sweep() {
if (item.second.table_arr.empty()) {
tablets_to_clean.push_back(item.first);
}
- for (TabletSharedPtr tablet : item.second.table_arr) {
- all_tablets.push_back(tablet);
+ for (BaseTabletSharedPtr& base_tablet : item.second.table_arr) {
+ // TODO: support MemTablet
+ if (!base_tablet->is_memory()) {
+ all_tablets.push_back(to_tablet(base_tablet));
+ }
}
}
}
@@ -1103,7 +1207,7 @@ bool TabletManager::try_schema_change_lock(TTabletId tablet_id) {
void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_map,
size_t* tablet_count) {
- DCHECK(tablet_count != 0);
+ DCHECK(tablet_count != nullptr);
*tablet_count = 0;
for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) {
ReadLock rlock(&_tablet_map_lock_array[i]);
@@ -1138,7 +1242,7 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
for (int32 i = 0 ; i < _tablet_map_lock_shard_size; i++) {
ReadLock tablet_map_rdlock(&_tablet_map_lock_array[i]);
for (tablet_map_t::value_type& table_ins : _tablet_map_array[i]){
- for (TabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
+ for (BaseTabletSharedPtr& tablet_ptr : table_ins.second.table_arr) {
if (tablet_ptr->tablet_state() != TABLET_RUNNING) {
continue;
}
@@ -1148,7 +1252,10 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
|| !tablet_ptr->init_succeeded()) {
continue;
}
- related_tablets.push_back(tablet_ptr);
+ // TODO: do MemTablet need checkpoint?
+ if (!tablet_ptr->is_memory()) {
+ related_tablets.push_back(to_tablet(tablet_ptr));
+ }
}
}
}
@@ -1170,7 +1277,7 @@ void TabletManager::_build_tablet_stat() {
TTabletStat stat;
stat.tablet_id = item.first;
- for (TabletSharedPtr tablet : item.second.table_arr) {
+ for (BaseTabletSharedPtr tablet : item.second.table_arr) {
// TODO(lingbin): if it is nullptr, why is it not deleted?
if (tablet == nullptr) {
continue;
@@ -1315,7 +1422,7 @@ OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& r
OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
- TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id, schema_hash);
+ BaseTabletSharedPtr dropped_tablet = _get_base_tablet_unlocked(tablet_id, schema_hash);
if (dropped_tablet == nullptr) {
LOG(WARNING) << "fail to drop tablet because it does not exist. "
<< " tablet_id=" << tablet_id
@@ -1323,16 +1430,16 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
return OLAP_ERR_TABLE_NOT_FOUND;
}
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
- list<TabletSharedPtr>& candidate_tablets = tablet_map[tablet_id].table_arr;
- list<TabletSharedPtr>::iterator it = candidate_tablets.begin();
+ auto& candidate_tablets = tablet_map[tablet_id].table_arr;
+ auto it = candidate_tablets.begin();
while (it != candidate_tablets.end()) {
if (!(*it)->equal(tablet_id, schema_hash)) {
++it;
continue;
}
- TabletSharedPtr tablet = *it;
- _remove_tablet_from_partition(*(*it));
+ auto tablet = *it;
+ _remove_tablet_from_partition(*tablet);
it = candidate_tablets.erase(it);
if (!keep_files) {
// drop tablet will update tablet meta, should lock
@@ -1359,12 +1466,13 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
return OLAP_SUCCESS;
}
-TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash) {
+BaseTabletSharedPtr TabletManager::_get_base_tablet_unlocked(TTabletId tablet_id,
+ SchemaHash schema_hash) {
VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash;
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
tablet_map_t::iterator it = tablet_map.find(tablet_id);
if (it != tablet_map.end()) {
- for (TabletSharedPtr tablet : it->second.table_arr) {
+ for (BaseTabletSharedPtr tablet : it->second.table_arr) {
CHECK(tablet != nullptr) << "tablet is nullptr. tablet_id=" << tablet_id;
if (tablet->equal(tablet_id, schema_hash)) {
VLOG(3) << "get tablet success. tablet_id=" << tablet_id
@@ -1376,16 +1484,28 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaH
VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash;
// Return nullptr tablet if fail
- TabletSharedPtr tablet;
- return tablet;
+ return nullptr;
+}
+
+TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash) {
+ BaseTabletSharedPtr ret = _get_base_tablet_unlocked(tablet_id, schema_hash);
+ if (ret == nullptr) {
+ return TabletSharedPtr();
+ }
+ if (ret->is_memory()) {
+ LOG(FATAL) << "fail to get TabletSharedPtr from MemTabletSharedPtr. tablet_id=" << tablet_id
+ << ", schema_hash=" << schema_hash;
+ return TabletSharedPtr();
+ }
+ return to_tablet(ret);
}
-void TabletManager::_add_tablet_to_partition(const Tablet& tablet) {
+void TabletManager::_add_tablet_to_partition(const BaseTablet& tablet) {
WriteLock wlock(&_partition_tablet_map_lock);
_partition_tablet_map[tablet.partition_id()].insert(tablet.get_tablet_info());
}
-void TabletManager::_remove_tablet_from_partition(const Tablet& tablet) {
+void TabletManager::_remove_tablet_from_partition(const BaseTablet& tablet) {
WriteLock wlock(&_partition_tablet_map_lock);
_partition_tablet_map[tablet.partition_id()].erase(tablet.get_tablet_info());
if (_partition_tablet_map[tablet.partition_id()].empty()) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 347fb10..745d7d6 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -71,6 +71,9 @@ public:
TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type, DataDir* data_dir);
+ BaseTabletSharedPtr get_base_tablet(TTabletId tablet_id, SchemaHash schema_hash,
+ bool include_deleted = false, std::string* err = nullptr);
+
TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted = false, std::string* err = nullptr);
@@ -140,10 +143,10 @@ private:
// OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication
// OLAP_ERR_NOT_INITED, if not inited
OLAPStatus _add_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
- const TabletSharedPtr& tablet, bool update_meta, bool force);
+ const BaseTabletSharedPtr& tablet, bool update_meta, bool force);
OLAPStatus _add_tablet_to_map_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
- const TabletSharedPtr& tablet, bool update_meta,
+ const BaseTabletSharedPtr& tablet, bool update_meta,
bool keep_files, bool drop_old);
bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
@@ -156,16 +159,19 @@ private:
OLAPStatus _drop_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash, bool keep_files);
+ BaseTabletSharedPtr _get_base_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash);
+ BaseTabletSharedPtr _get_base_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
+ bool include_deleted, std::string* err);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted, std::string* err);
- TabletSharedPtr _internal_create_tablet_unlocked(const AlterTabletType alter_type,
- const TCreateTabletReq& request,
- const bool is_schema_change,
- const Tablet* base_tablet,
- const std::vector<DataDir*>& data_dirs);
- TabletSharedPtr _create_tablet_meta_and_dir_unlocked(const TCreateTabletReq& request,
+ BaseTabletSharedPtr _internal_create_tablet_unlocked(const AlterTabletType alter_type,
+ const TCreateTabletReq& request,
+ const bool is_schema_change,
+ const Tablet* base_tablet,
+ const std::vector<DataDir*>& data_dirs);
+ BaseTabletSharedPtr _create_tablet_meta_and_dir_unlocked(const TCreateTabletReq& request,
const bool is_schema_change,
const Tablet* base_tablet,
const std::vector<DataDir*>& data_dirs);
@@ -177,9 +183,9 @@ private:
void _build_tablet_stat();
- void _add_tablet_to_partition(const Tablet& tablet);
+ void _add_tablet_to_partition(const BaseTablet& tablet);
- void _remove_tablet_from_partition(const Tablet& tablet);
+ void _remove_tablet_from_partition(const BaseTablet& tablet);
inline RWMutex& _get_tablet_map_lock(TTabletId tabletId);
@@ -193,7 +199,7 @@ private:
// The first element(i.e. tablet_arr[0]) is the base tablet. When we add new tablet
// to tablet_arr, we will sort all the elements in create-time ascending order,
// which will ensure the first one is base-tablet
- std::list<TabletSharedPtr> table_arr;
+ std::list<BaseTabletSharedPtr> table_arr;
};
// tablet_id -> TabletInstances
typedef std::unordered_map<int64_t, TableInstances> tablet_map_t;
@@ -209,7 +215,7 @@ private:
RWMutex _shutdown_tablets_lock;
// partition_id => tablet_info
std::map<int64_t, std::set<TabletInfo>> _partition_tablet_map;
- std::vector<TabletSharedPtr> _shutdown_tablets;
+ std::vector<BaseTabletSharedPtr> _shutdown_tablets;
std::mutex _tablet_stat_mutex;
// cache to save tablets' statistics, such as data-size and row-count
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 4b9a45f..a4ef612 100755
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -91,7 +91,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id,
tablet_meta_pb.set_tablet_state(PB_RUNNING);
*(tablet_meta_pb.mutable_tablet_uid()) = tablet_uid.to_proto();
tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_MEMORY ?
- TabletTypePB::TABLET_TYPE_DISK : TabletTypePB::TABLET_TYPE_MEMORY);
+ TabletTypePB::TABLET_TYPE_MEMORY : TabletTypePB::TABLET_TYPE_DISK);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
diff --git a/be/test/olap/memory/mem_tablet_test.cpp b/be/test/olap/memory/mem_tablet_test.cpp
index 979f462..c0c6f9b 100644
--- a/be/test/olap/memory/mem_tablet_test.cpp
+++ b/be/test/olap/memory/mem_tablet_test.cpp
@@ -70,7 +70,7 @@ TEST(MemTablet, writescan) {
new TabletMeta(1, 1, 1, 1, 1, tschema, static_cast<uint32_t>(sc->cid_size()),
col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY));
std::shared_ptr<MemTablet> tablet = MemTablet::create_tablet_from_meta(tablet_meta, nullptr);
- ASSERT_TRUE(tablet->init().ok());
+ ASSERT_EQ(tablet->init(), OLAP_SUCCESS);
uint64_t cur_version = 0;
vector<TData> alldata(num_insert);
diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift
index ded383b..345b6ce 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -38,6 +38,7 @@ struct TTabletInfo {
12: optional bool used
13: optional Types.TPartitionId partition_id
14: optional bool is_in_memory
+ 15: optional AgentService.TTabletType tablet_type
}
struct TFinishTaskRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org