You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/24 02:10:39 UTC

[incubator-doris] branch master updated: [Load Parallel][2/3] Support parallel flushing memtable during load (#5163)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab06e92  [Load Parallel][2/3] Support parallel flushing memtable during load (#5163)
ab06e92 is described below

commit ab06e9202195dd8312adf2368d8fd6f0827a295d
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Jan 24 10:10:30 2021 +0800

    [Load Parallel][2/3] Support parallel flushing memtable during load (#5163)
    
    In the previous implementation, in an load job,
    multiple memtables of the same tablet are written to disk sequentially.
    In fact, multiple memtables can be written out of order in parallel,
    only need to ensure that each memtable uses a different segment writer.
---
 be/src/common/config.h                    |  2 +-
 be/src/olap/delta_writer.cpp              |  2 +-
 be/src/olap/memtable.cpp                  | 45 ++++++++++++++++----
 be/src/olap/memtable.h                    | 26 ++++++++++++
 be/src/olap/memtable_flush_executor.cpp   | 21 ++++++++--
 be/src/olap/memtable_flush_executor.h     |  6 ++-
 be/src/olap/rowset/beta_rowset_writer.cpp | 69 ++++++++++++++++++++++++-------
 be/src/olap/rowset/beta_rowset_writer.h   | 20 ++++++---
 be/src/olap/rowset/rowset_writer.h        |  5 +++
 9 files changed, 162 insertions(+), 34 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9a28c6b..2d5526e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -34,7 +34,7 @@ CONF_Int32(be_port, "9060");
 CONF_Int32(brpc_port, "8060");
 
 // the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
-CONF_Int32(brpc_num_threads, "-1")
+CONF_Int32(brpc_num_threads, "-1");
 
 // Declare a selection strategy for those servers have many ips.
 // Note that there should at most one ip match this list.
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 5339a47..1ed44fb 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -169,7 +169,7 @@ OLAPStatus DeltaWriter::init() {
     _reset_mem_table();
 
     // create flush handler
-    RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token));
+    RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token, writer_context.rowset_type));
 
     _is_init = true;
     return OLAP_SUCCESS;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2ff4b58..f728af5 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -121,14 +121,21 @@ OLAPStatus MemTable::flush() {
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
-        Table::Iterator it(_skip_list);
-        for (it.SeekToFirst(); it.Valid(); it.Next()) {
-            char* row = (char*)it.key();
-            ContiguousRow dst_row(_schema, row);
-            agg_finalize_row(&dst_row, _table_mem_pool.get());
-            RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
+        OLAPStatus st = _rowset_writer->flush_single_memtable(this, &_flush_size);
+        if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) {
+            // For alpha rowset, we do not implement "flush_single_memtable".
+            // Flush the memtable like the old way.
+            Table::Iterator it(_skip_list);
+            for (it.SeekToFirst(); it.Valid(); it.Next()) {
+                char* row = (char*)it.key();
+                ContiguousRow dst_row(_schema, row);
+                agg_finalize_row(&dst_row, _table_mem_pool.get());
+                RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
+            }
+            RETURN_NOT_OK(_rowset_writer->flush());
+        } else {
+            RETURN_NOT_OK(st);
         }
-        RETURN_NOT_OK(_rowset_writer->flush());
     }
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
@@ -139,4 +146,28 @@ OLAPStatus MemTable::close() {
     return flush();
 }
 
+MemTable::Iterator::Iterator(MemTable* memtable):
+    _mem_table(memtable),
+    _it(memtable->_skip_list) {
+}
+
+void MemTable::Iterator::seek_to_first() {
+    _it.SeekToFirst();
+}
+
+bool MemTable::Iterator::valid() {
+    return _it.Valid();
+}
+
+void MemTable::Iterator::next() {
+    _it.Next();
+}
+
+ContiguousRow MemTable::Iterator::get_current_row() {
+    char* row = (char*) _it.key();
+    ContiguousRow dst_row(_mem_table->_schema, row);
+    agg_finalize_row(&dst_row, _mem_table->_table_mem_pool.get());
+    return dst_row;
+}
+
 } // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 633ab33..42ded4e 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -46,9 +46,12 @@ public:
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
     void insert(const Tuple* tuple);
+    /// Flush 
     OLAPStatus flush();
     OLAPStatus close();
 
+    int64_t flush_size() const { return _flush_size; }
+
 private:
     class RowCursorComparator {
     public:
@@ -58,9 +61,29 @@ private:
     private:
         const Schema* _schema;
     };
+
     typedef SkipList<char*, RowCursorComparator> Table;
     typedef Table::key_type TableKey;
 
+public:
+    /// The iterator of memtable, so that the data in this memtable
+    /// can be visited outside.
+    class Iterator {
+        public:
+            Iterator(MemTable* mem_table);
+            ~Iterator() {}
+
+            void seek_to_first();
+            bool valid();
+            void next();
+            ContiguousRow get_current_row();
+
+        private:
+            MemTable* _mem_table;
+            Table::Iterator _it;
+    };
+
+private:
     void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
     void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
 
@@ -89,6 +112,9 @@ private:
 
     RowsetWriter* _rowset_writer;
 
+    // the data size flushed on disk of this memtable
+    int64_t _flush_size = 0;
+
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index e3f6c90..fb78a55 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -26,7 +26,9 @@ namespace doris {
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
     os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
-       << ", flush count=" << stat.flush_count << ")";
+       << ", flush count=" << stat.flush_count
+       << ", flush bytes: " << stat.flush_size_bytes
+       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
     return os;
 }
 
@@ -65,9 +67,14 @@ void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
         return;
     }
 
+    VLOG_CRITICAL << "flush memtable cost: " << timer.elapsed_time()
+            << ", count: " << _stats.flush_count
+            << ", mem size: " << memtable->memory_usage()
+            << ", disk size: " << memtable->flush_size();
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_count++;
     _stats.flush_size_bytes += memtable->memory_usage();
+    _stats.flush_disk_size_bytes += memtable->flush_size();
 }
 
 void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
@@ -81,8 +88,16 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
-OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
-    flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+OLAPStatus MemTableFlushExecutor::create_flush_token(
+        std::unique_ptr<FlushToken>* flush_token,
+        RowsetTypePB rowset_type) {
+    if (rowset_type == BETA_ROWSET) {
+        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
+        flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
+    } else {
+        // alpha rowset do not support flush in CONCURRENT.
+        flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+    }
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index 9f8a8c9..4b6795b 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 
+#include "gen_cpp/olap_file.pb.h"
 #include "olap/olap_define.h"
 #include "util/threadpool.h"
 
@@ -37,6 +38,7 @@ struct FlushStatistic {
     int64_t flush_time_ns = 0;
     int64_t flush_count = 0;
     int64_t flush_size_bytes = 0;
+    int64_t flush_disk_size_bytes = 0;
 };
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
@@ -95,7 +97,9 @@ public:
     // because it needs path hash of each data dir.
     void init(const std::vector<DataDir*>& data_dirs);
 
-    OLAPStatus create_flush_token(std::unique_ptr<FlushToken>* flush_token);
+    OLAPStatus create_flush_token(
+            std::unique_ptr<FlushToken>* flush_token,
+            RowsetTypePB rowset_type);
 
 private:
     std::unique_ptr<ThreadPool> _flush_pool;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index e4ebd75..84207c2 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -24,6 +24,7 @@
 #include "env/env.h"
 #include "gutil/strings/substitute.h"
 #include "olap/fs/fs_util.h"
+#include "olap/memtable.h"
 #include "olap/olap_define.h"
 #include "olap/row.h"        // ContiguousRow
 #include "olap/row_cursor.h" // RowCursor
@@ -90,7 +91,7 @@ OLAPStatus BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte
 template <typename RowType>
 OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) {
     if (PREDICT_FALSE(_segment_writer == nullptr)) {
-        RETURN_NOT_OK(_create_segment_writer());
+        RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
     }
     // TODO update rowset zonemap
     auto s = _segment_writer->append_row(row);
@@ -100,7 +101,7 @@ OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) {
     }
     if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
                       _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) {
-        RETURN_NOT_OK(_flush_segment_writer());
+        RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
     }
     ++_num_rows_written;
     return OLAP_SUCCESS;
@@ -131,11 +132,47 @@ OLAPStatus BetaRowsetWriter::add_rowset_for_linked_schema_change(
 
 OLAPStatus BetaRowsetWriter::flush() {
     if (_segment_writer != nullptr) {
-        RETURN_NOT_OK(_flush_segment_writer());
+        RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
     }
     return OLAP_SUCCESS;
 }
 
+OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
+    int64_t current_flush_size = _total_data_size + _total_index_size;
+    // Create segment writer for each memtable, so that
+    // all memtables can be flushed in parallel.
+    std::unique_ptr<segment_v2::SegmentWriter> writer;
+
+    MemTable::Iterator it(memtable);
+    it.seek_to_first();
+    if (it.valid()) {
+        // Only create writer if memtable has data.
+        // Because we do not allow to flush a empty segment writer.
+        RETURN_NOT_OK(_create_segment_writer(&writer));
+    }
+    for ( ; it.valid(); it.next()) {
+        ContiguousRow dst_row = it.get_current_row();
+        auto s = writer->append_row(dst_row);
+        if (PREDICT_FALSE(!s.ok())) {
+            LOG(WARNING) << "failed to append row: " << s.to_string();
+            return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
+        }
+
+        if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
+                    writer->num_rows_written() >= _context.max_rows_per_segment)) {
+            RETURN_NOT_OK(_flush_segment_writer(&writer));
+        }
+        ++_num_rows_written;
+    }
+
+    if (writer != nullptr) {
+        RETURN_NOT_OK(_flush_segment_writer(&writer));
+    }
+
+    *flush_size = (_total_data_size + _total_index_size) - current_flush_size;
+    return OLAP_SUCCESS;
+}
+
 RowsetSharedPtr BetaRowsetWriter::build() {
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& wblock : _wblocks) {
@@ -172,9 +209,9 @@ RowsetSharedPtr BetaRowsetWriter::build() {
     return rowset;
 }
 
-OLAPStatus BetaRowsetWriter::_create_segment_writer() {
+OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
     auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id,
-                                              _num_segment);
+                                              _num_segment++);
     // TODO(lingbin): should use a more general way to get BlockManager object
     // and tablets with the same type should share one BlockManager object;
     fs::BlockManager* block_mgr = fs::fs_util::block_manager();
@@ -189,31 +226,33 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer() {
 
     DCHECK(wblock != nullptr);
     segment_v2::SegmentWriterOptions writer_options;
-    _segment_writer.reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
-                                                        _context.tablet_schema, writer_options));
-    _wblocks.push_back(std::move(wblock));
-    // TODO set write_mbytes_per_sec based on writer type (load/base compaction/cumulative compaction)
-    auto s = _segment_writer->init(config::push_write_mbytes_per_sec);
+    writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
+                                                _context.tablet_schema, writer_options));
+    {
+        std::lock_guard<SpinLock> l(_lock);
+        _wblocks.push_back(std::move(wblock));
+    }
+
+    auto s = (*writer)->init(config::push_write_mbytes_per_sec);
     if (!s.ok()) {
         LOG(WARNING) << "failed to init segment writer: " << s.to_string();
-        _segment_writer.reset(nullptr);
+        writer->reset(nullptr);
         return OLAP_ERR_INIT_FAILED;
     }
-    ++_num_segment;
     return OLAP_SUCCESS;
 }
 
-OLAPStatus BetaRowsetWriter::_flush_segment_writer() {
+OLAPStatus BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
     uint64_t segment_size;
     uint64_t index_size;
-    Status s = _segment_writer->finalize(&segment_size, &index_size);
+    Status s = (*writer)->finalize(&segment_size, &index_size);
     if (!s.ok()) {
         LOG(WARNING) << "failed to finalize segment: " << s.to_string();
         return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
     }
     _total_data_size += segment_size;
     _total_index_size += index_size;
-    _segment_writer.reset();
+    writer->reset();
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index c0c0467..c174a90 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -51,6 +51,9 @@ public:
 
     OLAPStatus flush() override;
 
+    // Return the file size flushed to disk in "flush_size"
+    OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
+
     RowsetSharedPtr build() override;
 
     Version version() override { return _context.version; }
@@ -63,27 +66,32 @@ private:
     template <typename RowType>
     OLAPStatus _add_row(const RowType& row);
 
-    OLAPStatus _create_segment_writer();
+    OLAPStatus _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
-    OLAPStatus _flush_segment_writer();
+    OLAPStatus _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
 private:
     RowsetWriterContext _context;
     std::shared_ptr<RowsetMeta> _rowset_meta;
 
-    int _num_segment;
+    AtomicInt<int32_t> _num_segment;
+    /// When flushing the memtable in the load process, we do not use this writer but an independent writer.
+    /// Because we want to flush memtables in parallel.
+    /// In other processes, such as merger or schema change, we will use this unified writer for data writing.
     std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
+    mutable SpinLock _lock; // lock to protect _wblocks.
     // TODO(lingbin): it is better to wrapper in a Batch?
     std::vector<std::unique_ptr<fs::WritableBlock>> _wblocks;
 
     // counters and statistics maintained during data write
-    int64_t _num_rows_written;
-    int64_t _total_data_size;
-    int64_t _total_index_size;
+    AtomicInt<int64_t> _num_rows_written;
+    AtomicInt<int64_t> _total_data_size;
+    AtomicInt<int64_t> _total_index_size;
     // TODO rowset Zonemap
 
     bool _is_pending = false;
     bool _already_built = false;
+
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index ba698c5..74a6485 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -27,6 +27,7 @@
 namespace doris {
 
 class ContiguousRow;
+class MemTable;
 class RowCursor;
 
 class RowsetWriter {
@@ -52,6 +53,10 @@ public:
     // note that `add_row` could also trigger flush when certain conditions are met
     virtual OLAPStatus flush() = 0;
 
+    virtual OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
+        return OLAP_ERR_FUNC_NOT_IMPLEMENTED;
+    }
+
     // finish building and return pointer to the built rowset (guaranteed to be inited).
     // return nullptr when failed
     virtual RowsetSharedPtr build() = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org