You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2022/06/07 07:05:02 UTC

[incubator-doris] branch master updated: [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change (#9811)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f2aa5f32b8 [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change (#9811)
f2aa5f32b8 is described below

commit f2aa5f32b8ab81308e8d27d25121a14e41961706
Author: Pxl <px...@qq.com>
AuthorDate: Tue Jun 7 15:04:57 2022 +0800

    [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change (#9811)
    
    Some pre-refactorings or interface additions for schema change
---
 be/src/olap/column_mapping.h                   | 14 ++++---
 be/src/olap/memtable.cpp                       | 14 ++-----
 be/src/olap/memtable.h                         | 10 ++---
 be/src/olap/olap_common.h                      | 27 +++++++------
 be/src/olap/rowset/alpha_rowset_writer.h       |  9 ++---
 be/src/olap/rowset/beta_rowset_reader.h        |  5 +--
 be/src/olap/rowset/beta_rowset_writer.h        |  7 +---
 be/src/olap/rowset/rowset.h                    | 17 ++++-----
 be/src/olap/rowset/rowset_writer.h             |  8 +---
 be/src/olap/rowset/rowset_writer_context.h     | 28 ++++++++++++--
 be/src/olap/storage_engine.cpp                 | 29 +++++---------
 be/src/olap/storage_engine.h                   |  5 ++-
 be/src/olap/tablet.h                           |  3 +-
 be/src/olap/tablet_schema.cpp                  | 22 ++++++++++-
 be/src/olap/tablet_schema.h                    |  7 +++-
 be/src/vec/common/cow.h                        | 33 ++++++++--------
 be/src/vec/common/string_ref.h                 |  2 +
 be/src/vec/exprs/vexpr_context.cpp             | 12 ++++--
 be/src/vec/exprs/vexpr_context.h               |  4 +-
 be/src/vec/functions/function_hash.cpp         |  2 +-
 be/src/vec/functions/function_ifnull.cpp       |  2 +-
 be/src/vec/functions/functions_geo.cpp         |  2 +-
 be/src/vec/functions/simple_function_factory.h | 12 +++---
 be/src/vec/olap/block_reader.cpp               | 24 +++---------
 be/src/vec/olap/olap_data_convertor.cpp        | 52 +++++---------------------
 be/src/vec/olap/olap_data_convertor.h          |  6 +++
 26 files changed, 173 insertions(+), 183 deletions(-)

diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h
index 929a1b3980..de82705a91 100644
--- a/be/src/olap/column_mapping.h
+++ b/be/src/olap/column_mapping.h
@@ -15,16 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H
-#define DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H
+#pragma once
 
+#include <gen_cpp/Exprs_types.h>
+
+#include <memory>
 namespace doris {
 
 class WrapperField;
 
 struct ColumnMapping {
     ColumnMapping() : ref_column(-1), default_value(nullptr) {}
-    virtual ~ColumnMapping() {}
+    virtual ~ColumnMapping() = default;
 
     // <0: use default value
     // >=0: use origin column
@@ -33,9 +35,9 @@ struct ColumnMapping {
     WrapperField* default_value;
     // materialize view transform function used in schema change
     std::string materialized_function;
+    std::shared_ptr<TExpr> expr;
 };
 
-typedef std::vector<ColumnMapping> SchemaMapping;
+using SchemaMapping = std::vector<ColumnMapping>;
 
-} // namespace doris
-#endif // DORIS_BE_SRC_COLUMN_MAPPING_H
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index e2fefd0149..c8c3d78e2a 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -91,17 +91,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
     for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
-        FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation();
-        std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) +
-                               vectorized::AGG_LOAD_SUFFIX;
-        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
-                       [](unsigned char c) { return std::tolower(c); });
-
-        // create aggregate function
-        vectorized::DataTypes argument_types {block->get_data_type(cid)};
         vectorized::AggregateFunctionPtr function =
-                vectorized::AggregateFunctionSimpleFactory::instance().get(
-                        agg_name, argument_types, {}, argument_types.back()->is_nullable());
+                _tablet_schema->column(cid).get_aggregate_function({block->get_data_type(cid)},
+                                                                   vectorized::AGG_LOAD_SUFFIX);
 
         DCHECK(function != nullptr);
         _agg_functions[cid] = function;
@@ -316,7 +308,7 @@ void MemTable::shrink_memtable_by_agg() {
     _collect_vskiplist_results<false>();
 }
 
-bool MemTable::is_flush() {
+bool MemTable::is_flush() const {
     return memory_usage() >= config::write_buffer_size;
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index d8f16ba4c5..c0b4c92839 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -56,7 +56,7 @@ public:
 
     void shrink_memtable_by_agg();
 
-    bool is_flush();
+    bool is_flush() const;
 
     bool need_to_agg();
 
@@ -72,7 +72,7 @@ private:
     class RowCursorComparator : public RowComparator {
     public:
         RowCursorComparator(const Schema* schema);
-        int operator()(const char* left, const char* right) const;
+        int operator()(const char* left, const char* right) const override;
 
     private:
         const Schema* _schema;
@@ -121,9 +121,9 @@ private:
     };
 
 private:
-    typedef SkipList<char*, RowComparator> Table;
-    typedef Table::key_type TableKey;
-    typedef SkipList<RowInBlock*, RowInBlockComparator> VecTable;
+    using Table = SkipList<char*, RowComparator>;
+    using TableKey = Table::key_type;
+    using VecTable = SkipList<RowInBlock*, RowInBlockComparator>;
 
 public:
     /// The iterator of memtable, so that the data in this memtable
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index c8256d0f2e..304157e4e4 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -36,17 +36,16 @@
 #include "util/hash_util.hpp"
 #include "util/uid_util.h"
 
-#define LOW_56_BITS 0x00ffffffffffffff
-
 namespace doris {
 
-static const int64_t MAX_ROWSET_ID = 1L << 56;
+static constexpr int64_t MAX_ROWSET_ID = 1L << 56;
+static constexpr int64_t LOW_56_BITS = 0x00ffffffffffffff;
 
-typedef int32_t SchemaHash;
-typedef __int128 int128_t;
-typedef unsigned __int128 uint128_t;
+using SchemaHash = int32_t;
+using int128_t = __int128;
+using uint128_t = unsigned __int128;
 
-typedef UniqueId TabletUid;
+using TabletUid = UniqueId;
 
 enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 };
 
@@ -200,6 +199,12 @@ struct Version {
     Version(int64_t first_, int64_t second_) : first(first_), second(second_) {}
     Version() : first(0), second(0) {}
 
+    static Version mock() {
+        // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary
+        // The version number takes a BIG NUMBER plus the version number of the current SchemaChange
+        return Version(1 << 28, 1 << 29);
+    }
+
     friend std::ostream& operator<<(std::ostream& os, const Version& version);
 
     bool operator!=(const Version& rhs) const { return first != rhs.first || second != rhs.second; }
@@ -211,7 +216,7 @@ struct Version {
     }
 };
 
-typedef std::vector<Version> Versions;
+using Versions = std::vector<Version>;
 
 inline std::ostream& operator<<(std::ostream& os, const Version& version) {
     return os << "[" << version.first << "-" << version.second << "]";
@@ -305,11 +310,11 @@ struct OlapReaderStatistics {
     int64_t general_debug_ns[GENERAL_DEBUG_COUNT] = {};
 };
 
-typedef uint32_t ColumnId;
+using ColumnId = uint32_t;
 // Column unique id set
-typedef std::set<uint32_t> UniqueIdSet;
+using UniqueIdSet = std::set<uint32_t>;
 // Column unique Id -> column id map
-typedef std::map<ColumnId, ColumnId> UniqueIdToColumnIdMap;
+using UniqueIdToColumnIdMap = std::map<ColumnId, ColumnId>;
 
 // 8 bit rowset id version
 // 56 bit, inc number from 1
diff --git a/be/src/olap/rowset/alpha_rowset_writer.h b/be/src/olap/rowset/alpha_rowset_writer.h
index a4f7dfa479..411beabf62 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.h
+++ b/be/src/olap/rowset/alpha_rowset_writer.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H
-#define DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H
+#pragma once
 
 #include <vector>
 
@@ -31,7 +30,7 @@ enum WriterState { WRITER_CREATED, WRITER_INITED, WRITER_FLUSHED };
 class AlphaRowsetWriter : public RowsetWriter {
 public:
     AlphaRowsetWriter();
-    virtual ~AlphaRowsetWriter();
+    ~AlphaRowsetWriter() override;
 
     Status init(const RowsetWriterContext& rowset_writer_context) override;
 
@@ -51,7 +50,7 @@ public:
 
     Version version() override { return _rowset_writer_context.version; }
 
-    int64_t num_rows() override { return _num_rows_written; }
+    int64_t num_rows() const override { return _num_rows_written; }
 
     RowsetId rowset_id() override { return _rowset_writer_context.rowset_id; }
 
@@ -84,5 +83,3 @@ private:
 };
 
 } // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H
diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h
index 7ba7b6a945..885d16b8c7 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H
-#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H
+#pragma once
 
 #include "olap/iterators.h"
 #include "olap/row_block.h"
@@ -73,5 +72,3 @@ private:
 };
 
 } // namespace doris
-
-#endif //DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 570a8f7650..69a957a3ce 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H
-#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H
+#pragma once
 
 #include "olap/rowset/rowset_writer.h"
 #include "vector"
@@ -63,7 +62,7 @@ public:
 
     Version version() override { return _context.version; }
 
-    int64_t num_rows() override { return _num_rows_written; }
+    int64_t num_rows() const override { return _num_rows_written; }
 
     RowsetId rowset_id() override { return _context.rowset_id; }
 
@@ -103,5 +102,3 @@ private:
 };
 
 } // namespace doris
-
-#endif //DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 8ca1ee5551..f1fded38ba 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
-#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
+#pragma once
 
 #include <atomic>
 #include <memory>
@@ -106,7 +105,7 @@ private:
 
 class Rowset : public std::enable_shared_from_this<Rowset> {
 public:
-    virtual ~Rowset() {}
+    virtual ~Rowset() = default;
 
     // Open all segment files in this rowset and load necessary metadata.
     // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
@@ -147,15 +146,15 @@ public:
     size_t num_rows() const { return rowset_meta()->num_rows(); }
     Version version() const { return rowset_meta()->version(); }
     RowsetId rowset_id() const { return rowset_meta()->rowset_id(); }
-    int64_t creation_time() { return rowset_meta()->creation_time(); }
+    int64_t creation_time() const { return rowset_meta()->creation_time(); }
     PUniqueId load_id() const { return rowset_meta()->load_id(); }
     int64_t txn_id() const { return rowset_meta()->txn_id(); }
     int64_t partition_id() const { return rowset_meta()->partition_id(); }
     // flag for push delete rowset
     bool delete_flag() const { return rowset_meta()->delete_flag(); }
     int64_t num_segments() const { return rowset_meta()->num_segments(); }
-    void to_rowset_pb(RowsetMetaPB* rs_meta) { return rowset_meta()->to_rowset_pb(rs_meta); }
-    const RowsetMetaPB& get_rowset_pb() { return rowset_meta()->get_rowset_pb(); }
+    void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); }
+    const RowsetMetaPB& get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
     KeysType keys_type() { return _schema->keys_type(); }
 
     // remove all files in this rowset
@@ -219,7 +218,9 @@ public:
 
     void set_need_delete_file() { _need_delete_file = true; }
 
-    bool contains_version(Version version) { return rowset_meta()->version().contains(version); }
+    bool contains_version(Version version) const {
+        return rowset_meta()->version().contains(version);
+    }
 
     FilePathDesc rowset_path_desc() { return _rowset_path_desc; }
 
@@ -290,5 +291,3 @@ protected:
 };
 
 } // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 22239f4eaf..49824cac01 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H
-#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H
+#pragma once
 
 #include "gen_cpp/olap_file.pb.h"
 #include "gen_cpp/types.pb.h"
@@ -30,7 +29,6 @@ namespace doris {
 
 struct ContiguousRow;
 class MemTable;
-class RowCursor;
 
 class RowsetWriter {
 public:
@@ -74,7 +72,7 @@ public:
 
     virtual Version version() = 0;
 
-    virtual int64_t num_rows() = 0;
+    virtual int64_t num_rows() const = 0;
 
     virtual RowsetId rowset_id() = 0;
 
@@ -85,5 +83,3 @@ private:
 };
 
 } // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H
diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h
index b93c4633d6..539845fe5f 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -15,11 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H
-#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H
+#pragma once
 
 #include "gen_cpp/olap_file.pb.h"
 #include "olap/data_dir.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
 #include "olap/tablet_schema.h"
 
 namespace doris {
@@ -42,6 +43,27 @@ struct RowsetWriterContext {
         load_id.set_hi(0);
         load_id.set_lo(0);
     }
+
+    static RowsetWriterContext create(const Version& version, TabletSharedPtr new_tablet,
+                                      RowsetTypePB new_rowset_type,
+                                      SegmentsOverlapPB segments_overlap) {
+        RowsetWriterContext context;
+        context.rowset_id = StorageEngine::instance()->next_rowset_id();
+        context.tablet_uid = new_tablet->tablet_uid();
+        context.tablet_id = new_tablet->tablet_id();
+        context.partition_id = new_tablet->partition_id();
+        context.tablet_schema_hash = new_tablet->schema_hash();
+        context.rowset_type = new_rowset_type;
+        context.path_desc = new_tablet->tablet_path_desc();
+        context.tablet_schema = &(new_tablet->tablet_schema());
+        context.data_dir = new_tablet->data_dir();
+        context.rowset_state = VISIBLE;
+        context.version = version;
+        context.segments_overlap = segments_overlap;
+
+        return context;
+    }
+
     RowsetId rowset_id;
     int64_t tablet_id;
     int64_t tablet_schema_hash;
@@ -74,5 +96,3 @@ struct RowsetWriterContext {
 };
 
 } // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 35dc3dd1e1..c872851352 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -248,7 +248,7 @@ Status StorageEngine::_init_store_map() {
         _store_map.emplace(store->path(), store);
     }
 
-    std::string stream_load_record_path = "";
+    std::string stream_load_record_path;
     if (!tmp_stores.empty()) {
         stream_load_record_path = tmp_stores[0]->path();
     }
@@ -1080,7 +1080,8 @@ void StorageEngine::notify_listeners() {
 }
 
 Status StorageEngine::execute_task(EngineTask* task) {
-    {
+    auto lock_related_tablets = [&]() -> std::vector<std::unique_lock<std::shared_mutex>> {
+        // add write lock to all related tablets
         std::vector<TabletInfo> tablet_infos;
         task->get_related_tablets(&tablet_infos);
         sort(tablet_infos.begin(), tablet_infos.end());
@@ -1096,7 +1097,11 @@ Status StorageEngine::execute_task(EngineTask* task) {
                              << tablet_info.tablet_id;
             }
         }
-        // add write lock to all related tablets
+        return wrlocks;
+    };
+
+    {
+        auto wrlocks = lock_related_tablets();
         Status prepare_status = task->prepare();
         if (prepare_status != Status::OK()) {
             return prepare_status;
@@ -1110,23 +1115,7 @@ Status StorageEngine::execute_task(EngineTask* task) {
     }
 
     {
-        std::vector<TabletInfo> tablet_infos;
-        // related tablets may be changed after execute task, so that get them here again
-        task->get_related_tablets(&tablet_infos);
-        sort(tablet_infos.begin(), tablet_infos.end());
-        std::vector<TabletSharedPtr> related_tablets;
-        std::vector<std::unique_lock<std::shared_mutex>> wrlocks;
-        for (TabletInfo& tablet_info : tablet_infos) {
-            TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.tablet_id);
-            if (tablet != nullptr) {
-                related_tablets.push_back(tablet);
-                wrlocks.push_back(std::unique_lock<std::shared_mutex>(tablet->get_header_lock()));
-            } else {
-                LOG(WARNING) << "could not get tablet before finish tabletid: "
-                             << tablet_info.tablet_id;
-            }
-        }
-        // add write lock to all related tablets
+        auto wrlocks = lock_related_tablets();
         Status fin_status = task->finish();
         return fin_status;
     }
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index ce33223c5a..df6e78874b 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -42,7 +42,6 @@
 #include "olap/olap_meta.h"
 #include "olap/options.h"
 #include "olap/rowset/rowset_id_generator.h"
-#include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "olap/task/engine_task.h"
 #include "olap/txn_manager.h"
@@ -104,7 +103,9 @@ public:
     std::vector<DataDir*> get_stores_for_create_tablet(TStorageMedium::type storage_medium);
     DataDir* get_store(const std::string& path);
 
-    uint32_t available_storage_medium_type_count() { return _available_storage_medium_type_count; }
+    uint32_t available_storage_medium_type_count() const {
+        return _available_storage_medium_type_count;
+    }
 
     Status set_cluster_id(int32_t cluster_id);
     int32_t effective_cluster_id() const { return _effective_cluster_id; }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a7f1830ef1..80ebfed636 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -33,7 +33,6 @@
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
-#include "olap/rowset/rowset_writer.h"
 #include "olap/tablet_meta.h"
 #include "olap/tuple.h"
 #include "olap/utils.h"
@@ -48,6 +47,8 @@ class TabletMeta;
 class CumulativeCompactionPolicy;
 class CumulativeCompaction;
 class BaseCompaction;
+class RowsetWriter;
+struct RowsetWriterContext;
 
 using TabletSharedPtr = std::shared_ptr<Tablet>;
 
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 6e9703f569..683dff9d62 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -18,8 +18,9 @@
 #include "olap/tablet_schema.h"
 
 #include "tablet_meta.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/core/block.h"
-#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
@@ -405,6 +406,16 @@ void TabletColumn::add_sub_column(TabletColumn& sub_column) {
     _sub_column_count += 1;
 }
 
+vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function(
+        vectorized::DataTypes argument_types, std::string suffix) const {
+    std::string agg_name = TabletColumn::get_string_by_aggregation_type(_aggregation) + suffix;
+    std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
+                   [](unsigned char c) { return std::tolower(c); });
+
+    return vectorized::AggregateFunctionSimpleFactory::instance().get(
+            agg_name, argument_types, {}, argument_types.back()->is_nullable());
+}
+
 void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
     _keys_type = schema.keys_type();
     _num_columns = 0;
@@ -525,6 +536,15 @@ vectorized::Block TabletSchema::create_block(
     return block;
 }
 
+vectorized::Block TabletSchema::create_block() const {
+    vectorized::Block block;
+    for (const auto& col : _cols) {
+        auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col);
+        block.insert({data_type->create_column(), data_type, col.name()});
+    }
+    return block;
+}
+
 bool operator==(const TabletColumn& a, const TabletColumn& b) {
     if (a._unique_id != b._unique_id) return false;
     if (a._col_name != b._col_name) return false;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 7c3209ee82..f0a5b72157 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -23,6 +23,8 @@
 #include "gen_cpp/segment_v2.pb.h"
 #include "olap/olap_define.h"
 #include "olap/types.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris {
 namespace vectorized {
@@ -62,9 +64,11 @@ public:
     size_t index_length() const { return _index_length; }
     void set_index_length(size_t index_length) { _index_length = index_length; }
     FieldAggregationMethod aggregation() const { return _aggregation; }
+    vectorized::AggregateFunctionPtr get_aggregate_function(vectorized::DataTypes argument_types,
+                                                            std::string suffix) const;
     int precision() const { return _precision; }
     int frac() const { return _frac; }
-    bool visible() { return _visible; }
+    bool visible() const { return _visible; }
     // Add a sub column.
     void add_sub_column(TabletColumn& sub_column);
 
@@ -151,6 +155,7 @@ public:
     vectorized::Block create_block(
             const std::vector<uint32_t>& return_columns,
             const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const;
+    vectorized::Block create_block() const;
 
 private:
     // Only for unit test.
diff --git a/be/src/vec/common/cow.h b/be/src/vec/common/cow.h
index 27f3e864a6..1bab30ddc2 100644
--- a/be/src/vec/common/cow.h
+++ b/be/src/vec/common/cow.h
@@ -20,31 +20,25 @@
 
 #pragma once
 
-#include <boost/smart_ptr/intrusive_ptr.hpp>
-#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <atomic>
 #include <initializer_list>
 
 /** Copy-on-write shared ptr.
   * Allows to work with shared immutable objects and sometimes unshare and mutate you own unique copy.
   *
   * Usage:
-
     class Column : public COW<Column>
     {
     private:
         friend class COW<Column>;
-
         /// Leave all constructors in private section. They will be avaliable through 'create' method.
         Column();
-
         /// Provide 'clone' method. It can be virtual if you want polymorphic behaviour.
         virtual Column * clone() const;
     public:
         /// Correctly use const qualifiers in your interface.
-
         virtual ~Column() {}
     };
-
   * It will provide 'create' and 'mutate' methods.
   * And 'Ptr' and 'MutablePtr' types.
   * Ptr is refcounted pointer to immutable object.
@@ -63,9 +57,7 @@
     Column::Ptr x = Column::create(1);
     /// Sharing single immutable object in two ptrs.
     Column::Ptr y = x;
-
     /// Now x and y are shared.
-
     /// Change value of x.
     {
         /// Creating mutable ptr. It can clone an object under the hood if it was shared.
@@ -75,9 +67,7 @@
         /// Assigning pointer 'x' to mutated object.
         x = std::move(mutate_x);
     }
-
     /// Now x and y are unshared and have different values.
-
   * Note. You may have heard that COW is bad practice.
   * Actually it is, if your values are small or if copying is done implicitly.
   * This is the case for string implementations.
@@ -120,20 +110,28 @@ protected:
         intrusive_ptr() : t(nullptr) {}
 
         intrusive_ptr(T* t, bool add_ref = true) : t(t) {
-            if (t && add_ref) ((std::remove_const_t<T>*)t)->add_ref();
+            if (t && add_ref) {
+                ((std::remove_const_t<T>*)t)->add_ref();
+            }
         }
 
         template <typename U>
         intrusive_ptr(intrusive_ptr<U> const& rhs) : t(rhs.get()) {
-            if (t) ((std::remove_const_t<T>*)t)->add_ref();
+            if (t) {
+                ((std::remove_const_t<T>*)t)->add_ref();
+            }
         }
 
         intrusive_ptr(intrusive_ptr const& rhs) : t(rhs.get()) {
-            if (t) ((std::remove_const_t<T>*)t)->add_ref();
+            if (t) {
+                ((std::remove_const_t<T>*)t)->add_ref();
+            }
         }
 
         ~intrusive_ptr() {
-            if (t) ((std::remove_const_t<T>*)t)->release_ref();
+            if (t) {
+                ((std::remove_const_t<T>*)t)->release_ref();
+            }
         }
 
         template <typename U>
@@ -313,10 +311,11 @@ public:
 
 protected:
     MutablePtr shallow_mutate() const {
-        if (this->use_count() > 1)
+        if (this->use_count() > 1) {
             return derived()->clone();
-        else
+        } else {
             return assume_mutable();
+        }
     }
 
 public:
diff --git a/be/src/vec/common/string_ref.h b/be/src/vec/common/string_ref.h
index 8f83d072ad..fc8fe5187b 100644
--- a/be/src/vec/common/string_ref.h
+++ b/be/src/vec/common/string_ref.h
@@ -29,6 +29,7 @@
 #include "gutil/hash/city.h"
 #include "gutil/hash/hash128to64.h"
 #include "udf/udf.h"
+#include "util/slice.h"
 #include "vec/common/unaligned.h"
 #include "vec/core/types.h"
 
@@ -54,6 +55,7 @@ struct StringRef {
 
     std::string to_string() const { return std::string(data, size); }
     std::string_view to_string_view() const { return std::string_view(data, size); }
+    doris::Slice to_slice() const { return doris::Slice(data, size); }
 
     // this is just for show, eg. print data to error log, to avoid print large string.
     std::string to_prefix(size_t length) const { return std::string(data, std::min(length, size)); }
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index ea3300eaf3..7da3d86d1e 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -48,7 +48,11 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state,
                                     const doris::RowDescriptor& row_desc,
                                     const std::shared_ptr<doris::MemTracker>& tracker) {
     _prepared = true;
-    _mem_tracker = tracker;
+    if (!tracker) {
+        _mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
+    } else {
+        _mem_tracker = tracker;
+    }
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _pool.reset(new MemPool(_mem_tracker.get()));
     return _root->prepare(state, row_desc, this);
@@ -78,7 +82,7 @@ void VExprContext::close(doris::RuntimeState* state) {
         _fn_contexts[i]->impl()->close();
     }
     // _pool can be NULL if Prepare() was never called
-    if (_pool != NULL) {
+    if (_pool != nullptr) {
         _pool->free_all();
     }
     _closed = true;
@@ -140,7 +144,9 @@ Block VExprContext::get_output_block_after_execute_exprs(
     for (auto vexpr_ctx : output_vexpr_ctxs) {
         int result_column_id = -1;
         status = vexpr_ctx->execute(&tmp_block, &result_column_id);
-        if (UNLIKELY(!status.ok())) return {};
+        if (UNLIKELY(!status)) {
+            return {};
+        }
         DCHECK(result_column_id != -1);
         result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
     }
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 86a9d086a6..bfa4468916 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -29,7 +29,7 @@ public:
     VExprContext(VExpr* expr);
     ~VExprContext();
     Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
-                   const std::shared_ptr<MemTracker>& tracker);
+                   const std::shared_ptr<MemTracker>& tracker = nullptr);
     Status open(RuntimeState* state);
     void close(RuntimeState* state);
     Status clone(RuntimeState* state, VExprContext** new_ctx);
@@ -61,7 +61,7 @@ public:
     static Block get_output_block_after_execute_exprs(const std::vector<vectorized::VExprContext*>&,
                                                       const Block&, Status&);
 
-    int get_last_result_column_id() {
+    int get_last_result_column_id() const {
         DCHECK(_last_result_column_id != -1);
         return _last_result_column_id;
     }
diff --git a/be/src/vec/functions/function_hash.cpp b/be/src/vec/functions/function_hash.cpp
index 92e2a55827..4a6235c9a6 100644
--- a/be/src/vec/functions/function_hash.cpp
+++ b/be/src/vec/functions/function_hash.cpp
@@ -223,7 +223,7 @@ struct MurmurHash3Impl32 {
 };
 using FunctionMurmurHash3_32 = FunctionVariadicArgumentsBase<DataTypeInt32, MurmurHash3Impl32>;
 
-void register_function_function_hash(SimpleFunctionFactory& factory) {
+void register_function_hash(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionMurmurHash2_64>();
     factory.register_function<FunctionMurmurHash3_32>();
 }
diff --git a/be/src/vec/functions/function_ifnull.cpp b/be/src/vec/functions/function_ifnull.cpp
index bbd66e51e4..1f9c9d48dc 100644
--- a/be/src/vec/functions/function_ifnull.cpp
+++ b/be/src/vec/functions/function_ifnull.cpp
@@ -21,7 +21,7 @@
 #include "function_ifnull.h"
 
 namespace doris::vectorized {
-void register_function_function_ifnull(SimpleFunctionFactory& factory) {
+void register_function_ifnull(SimpleFunctionFactory& factory) {
     factory.register_function<FunctionIfNull>();
     factory.register_alias(FunctionIfNull::name, "nvl");
 }
diff --git a/be/src/vec/functions/functions_geo.cpp b/be/src/vec/functions/functions_geo.cpp
index 71018a9b67..534bc1300e 100644
--- a/be/src/vec/functions/functions_geo.cpp
+++ b/be/src/vec/functions/functions_geo.cpp
@@ -517,7 +517,7 @@ struct StGeoFromText {
     }
 };
 
-void register_geo_functions(SimpleFunctionFactory& factory) {
+void register_function_geo(SimpleFunctionFactory& factory) {
     factory.register_function<GeoFunction<StPoint>>();
     factory.register_function<GeoFunction<StAsText<StAsWktName>>>();
     factory.register_function<GeoFunction<StAsText<StAsTextName>>>();
diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h
index 986528f25f..7e56dbac99 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -63,8 +63,8 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory);
 void register_function_timestamp(SimpleFunctionFactory& factory);
 void register_function_utility(SimpleFunctionFactory& factory);
 void register_function_json(SimpleFunctionFactory& factory);
-void register_function_function_hash(SimpleFunctionFactory& factory);
-void register_function_function_ifnull(SimpleFunctionFactory& factory);
+void register_function_hash(SimpleFunctionFactory& factory);
+void register_function_ifnull(SimpleFunctionFactory& factory);
 void register_function_like(SimpleFunctionFactory& factory);
 void register_function_regexp(SimpleFunctionFactory& factory);
 void register_function_random(SimpleFunctionFactory& factory);
@@ -75,7 +75,7 @@ void register_function_convert_tz(SimpleFunctionFactory& factory);
 void register_function_least_greast(SimpleFunctionFactory& factory);
 void register_function_fake(SimpleFunctionFactory& factory);
 void register_function_array(SimpleFunctionFactory& factory);
-void register_geo_functions(SimpleFunctionFactory& factory);
+void register_function_geo(SimpleFunctionFactory& factory);
 
 void register_function_encryption(SimpleFunctionFactory& factory);
 void register_function_regexp_extract(SimpleFunctionFactory& factory);
@@ -194,8 +194,8 @@ public:
             register_function_date_time_to_string(instance);
             register_function_date_time_string_to_string(instance);
             register_function_json(instance);
-            register_function_function_hash(instance);
-            register_function_function_ifnull(instance);
+            register_function_hash(instance);
+            register_function_ifnull(instance);
             register_function_comparison_eq_for_null(instance);
             register_function_like(instance);
             register_function_regexp(instance);
@@ -210,7 +210,7 @@ public:
             register_function_regexp_extract(instance);
             register_function_hex_variadic(instance);
             register_function_array(instance);
-            register_geo_functions(instance);
+            register_function_geo(instance);
         });
         return instance;
     }
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 62e583849f..aeeb84c679 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -23,16 +23,15 @@
 #include "olap/storage_engine.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/olap/vcollect_iterator.h"
 
 namespace doris::vectorized {
 
 BlockReader::~BlockReader() {
     for (int i = 0; i < _agg_functions.size(); ++i) {
-        AggregateFunctionPtr function = _agg_functions[i];
-        AggregateDataPtr place = _agg_places[i];
-        function->destroy(place);
-        delete[] place;
+        _agg_functions[i]->destroy(_agg_places[i]);
+        delete[] _agg_places[i];
     }
 }
 
@@ -85,22 +84,11 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
 
     auto& tablet_schema = tablet()->tablet_schema();
     for (auto idx : _agg_columns_idx) {
-        FieldAggregationMethod agg_method =
+        AggregateFunctionPtr function =
                 tablet_schema
                         .column(read_params.origin_return_columns->at(_return_columns_loc[idx]))
-                        .aggregation();
-        std::string agg_name =
-                TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_READER_SUFFIX;
-        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
-                       [](unsigned char c) { return std::tolower(c); });
-
-        // create aggregate function
-        DataTypes argument_types;
-        argument_types.push_back(_next_row.block->get_data_type(idx));
-        Array params;
-        AggregateFunctionPtr function = AggregateFunctionSimpleFactory::instance().get(
-                agg_name, argument_types, params,
-                _next_row.block->get_data_type(idx)->is_nullable());
+                        .get_aggregate_function({_next_row.block->get_data_type(idx)},
+                                                vectorized::AGG_READER_SUFFIX);
         DCHECK(function != nullptr);
         _agg_functions.push_back(function);
         // create aggregate data
diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp
index 5da9641fcb..604d2a02ec 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -123,7 +123,9 @@ std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_colu
 // class OlapBlockDataConvertor::OlapColumnDataConvertorBase
 void OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(
         const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
-    assert(num_rows > 0 && row_pos + num_rows <= typed_column.column->size());
+    DCHECK(row_pos + num_rows <= typed_column.column->size())
+            << "row_pos=" << row_pos << ", num_rows=" << num_rows
+            << ", typed_column.column->size()=" << typed_column.column->size();
     _typed_column = typed_column;
     _row_pos = row_pos;
     _num_rows = num_rows;
@@ -358,55 +360,21 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorChar::convert_to_olap() {
         column_string = assert_cast<const vectorized::ColumnString*>(_typed_column.column.get());
     }
 
-    assert(column_string);
-
     // If column_string is not padded to full, we should do padding here.
     if (should_padding(column_string, _length)) {
         _column = clone_and_padding(column_string, _length);
         column_string = assert_cast<const vectorized::ColumnString*>(_column.get());
     }
 
-    const ColumnString::Char* char_data = column_string->get_chars().data();
-    const ColumnString::Offset* offset_cur = column_string->get_offsets().data() + _row_pos;
-    const ColumnString::Offset* offset_end = offset_cur + _num_rows;
-    Slice* slice = _slice.data();
-    size_t string_length;
-    size_t string_offset = *(offset_cur - 1);
-    [[maybe_unused]] size_t slice_size = _length;
-    if (_nullmap) {
-        const UInt8* nullmap_cur = _nullmap + _row_pos;
-        while (offset_cur != offset_end) {
-            if (!*nullmap_cur) {
-                string_length = *offset_cur - string_offset - 1;
-                assert(string_length <= slice_size);
-                slice->data = (char*)char_data + string_offset;
-                slice->size = string_length;
-            } else {
-                // TODO: this may not be necessary, check and remove later
-                slice->data = nullptr;
-                slice->size = 0;
-            }
-
-            string_offset = *offset_cur;
-            ++nullmap_cur;
-            ++slice;
-            ++offset_cur;
+    for (size_t i = 0; i < _num_rows; i++) {
+        if (!_nullmap || !_nullmap[i + _row_pos]) {
+            _slice[i] = column_string->get_data_at(i + _row_pos).to_slice();
+            DCHECK(_slice[i].size == _length)
+                    << "char type data length not equal to schema, schema=" << _length
+                    << ", real=" << _slice[i].size;
         }
-        assert(nullmap_cur == _nullmap + _row_pos + _num_rows && slice == _slice.get_end_ptr());
-    } else {
-        while (offset_cur != offset_end) {
-            string_length = *offset_cur - string_offset - 1;
-            assert(string_length <= slice_size);
-
-            slice->data = (char*)char_data + string_offset;
-            slice->size = string_length;
-
-            string_offset = *offset_cur;
-            ++slice;
-            ++offset_cur;
-        }
-        assert(slice == _slice.get_end_ptr());
     }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h
index d0f5d01525..e11fb672b2 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -50,6 +50,7 @@ private:
     class OlapColumnDataConvertorBase : public IOlapColumnDataAccessor {
     public:
         OlapColumnDataConvertorBase() = default;
+        ~OlapColumnDataConvertorBase() override = default;
         OlapColumnDataConvertorBase(const OlapColumnDataConvertorBase&) = delete;
         OlapColumnDataConvertorBase& operator=(const OlapColumnDataConvertorBase&) = delete;
         OlapColumnDataConvertorBase(OlapColumnDataConvertorBase&&) = delete;
@@ -120,6 +121,11 @@ private:
                 column->offsets[i] = (i + 1) * (padding_length + 1);
 
                 auto str = input->get_data_at(i);
+
+                DCHECK(str.size <= padding_length)
+                        << "char type data length over limit, padding_length=" << padding_length
+                        << ", real=" << str.size;
+
                 if (str.size) {
                     memcpy(padded_column->chars.data() + i * (padding_length + 1), str.data,
                            str.size);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org