You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/19 15:24:48 UTC

[GitHub] [doris] compiletheworld commented on a diff in pull request #11016: [feature] Support buffering data and automatically committing for stream load

compiletheworld commented on code in PR #11016:
URL: https://github.com/apache/doris/pull/11016#discussion_r924488903


##########
be/src/olap/wal_writer.cpp:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_writer.h"
+
+#include "olap/storage_engine.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name), _row_count(0) {
+    timer.start();
+}
+
+WalWriter::~WalWriter() {}
+
+Status WalWriter::init() {
+    Status st = _file_handler.open_with_mode(_file_name, O_CREAT | O_EXCL | O_WRONLY,
+                                             S_IRUSR | S_IWUSR);
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to open wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalWriter::finalize() {
+    Status st = _file_handler.close();
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to close wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalWriter::append_rows(const PDataRowArray& rows) {
+    size_t total_size = 0;
+    for (const auto& row : rows) {
+        total_size += ROW_LENGTH_SIZE + row.ByteSizeLong();
+    }
+    // allocate memory from heap, since stack size is limited
+    // uint8_t row_binary[total_size];
+    std::shared_ptr<void> binary(malloc(total_size), free);

Review Comment:
   Using `std::string binary(total_size, '\0');` instead of 2 ptrs will may these code simpler and easy to read.



##########
be/src/olap/wal_reader.cpp:
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_reader.h"
+
+#include "common/status.h"
+#include "olap/storage_engine.h"
+#include "olap/wal_writer.h"
+
+namespace doris {
+
+WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {}
+
+WalReader::~WalReader() {}
+
+Status WalReader::init() {
+    Status st = _file_handler.open(_file_name, O_RDONLY);
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to open wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalReader::finalize() {
+    Status st = _file_handler.close();
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to close wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalReader::read_row(PDataRow& row) {
+    if (_offset >= _file_handler.length()) {
+        return Status::EndOfFile("end of wal file");
+    }
+    // read row length
+    uint8_t row_len_buf[WalWriter::ROW_LENGTH_SIZE];
+    RETURN_IF_ERROR(_file_handler.pread(row_len_buf, WalWriter::ROW_LENGTH_SIZE, _offset));
+    _offset += WalWriter::ROW_LENGTH_SIZE;
+    size_t row_len;

Review Comment:
   All we need to check is that `ROW_LENGTH_SIZE  == sizeof(row_len)`,
   Use `_file_handler.pread()` to read the value directly into `&row_len` seems to be much more straightforward?



##########
be/src/olap/wal_writer.h:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_ROWSET_WAL_WRITER_H
+#define DORIS_BE_SRC_OLAP_ROWSET_WAL_WRITER_H
+
+#include "gen_cpp/internal_service.pb.h"
+#include "olap/file_helper.h"
+
+namespace doris {
+
+using PDataRowArray = google::protobuf::RepeatedPtrField<PDataRow>;
+
+class WalWriter {
+public:
+    explicit WalWriter(const std::string& file_name);
+    ~WalWriter();
+
+    Status init();

Review Comment:
   It's better that we leave some comments on these functions signatures.



##########
be/src/olap/wal_writer.cpp:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_writer.h"
+
+#include "olap/storage_engine.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name), _row_count(0) {
+    timer.start();
+}
+
+WalWriter::~WalWriter() {}
+
+Status WalWriter::init() {
+    Status st = _file_handler.open_with_mode(_file_name, O_CREAT | O_EXCL | O_WRONLY,
+                                             S_IRUSR | S_IWUSR);
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to open wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalWriter::finalize() {
+    Status st = _file_handler.close();
+    if (UNLIKELY(!st.ok())) {
+        LOG(WARNING) << "fail to close wal file: '" << _file_name << "', error: " << st.to_string();
+    }
+    return st;
+}
+
+Status WalWriter::append_rows(const PDataRowArray& rows) {
+    size_t total_size = 0;
+    for (const auto& row : rows) {
+        total_size += ROW_LENGTH_SIZE + row.ByteSizeLong();
+    }
+    // allocate memory from heap, since stack size is limited
+    // uint8_t row_binary[total_size];
+    std::shared_ptr<void> binary(malloc(total_size), free);
+    uint8_t* row_binary = static_cast<uint8_t*>(binary.get());
+    memset(row_binary, 0, total_size);
+    size_t offset = 0;
+    for (const auto& row : rows) {
+        unsigned long row_length = row.GetCachedSize();
+        memcpy(row_binary + offset, &row_length, ROW_LENGTH_SIZE);
+        offset += ROW_LENGTH_SIZE;
+        memcpy(row_binary + offset, row.SerializeAsString().data(), row_length);
+        offset += row_length;
+    }
+    // write rows
+    RETURN_IF_ERROR(_file_handler.write(row_binary, total_size));

Review Comment:
   Can we write in the loop?
   If not, it seems that we can pipeline `row.SerializeAsString()` and `_file_handler.write()`?



##########
be/src/runtime/auto_batch_load_mgr.cpp:
##########
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/auto_batch_load_mgr.h"
+
+#include "gutil/strings/split.h"
+#include "gutil/strtoint.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "util/file_utils.h"
+
+namespace doris {
+
+class RuntimeProfile;
+class FragmentMgr;
+
+AutoBatchLoadMgr::AutoBatchLoadMgr(ExecEnv* exec_env)
+        : _exec_env(exec_env), _wal_recovering(true), _stop_background_threads_latch(1) {
+    // start a thread to scan WAL directory and do recovery
+    std::thread(&AutoBatchLoadMgr::recovery_wals, this).detach();
+    // start a thread to check if auto commit conditions are satisfied and do commit
+    auto st = Thread::create(
+            "AutoBatchLoadMgr", "commit_auto_batch_load",
+            [this]() { this->_commit_auto_batch_load(); }, &_commit_thread);
+    CHECK(st.ok()) << st.to_string();
+}
+
+AutoBatchLoadMgr::~AutoBatchLoadMgr() {
+    _stop_background_threads_latch.count_down();
+    if (_commit_thread) {
+        _commit_thread->join();
+    }
+}
+
+Status AutoBatchLoadMgr::auto_batch_load(const PAutoBatchLoadRequest* request, std::string& label,
+                                         int64_t& txn_id) {
+    if (_wal_recovering) {
+        return Status::InternalError("recovering WALs");
+    }
+    auto db_id = request->db_id();
+    auto table_id = request->table_id();
+    if (!_table_map.count(table_id)) {
+        std::lock_guard<std::mutex> lock(_lock);
+        if (!_table_map.count(table_id)) {
+            _table_map.emplace(table_id,
+                               std::make_shared<AutoBatchLoadTable>(_exec_env, db_id, table_id));
+        }
+    }
+    auto it = _table_map.find(table_id);
+    return it->second->auto_batch_load(request, label, txn_id);
+}
+
+// When starting be, scan auto_batch_load directory and recovery WALs
+void AutoBatchLoadMgr::recovery_wals() {
+    // port == 0 means not received heartbeat yet
+    while (_exec_env->master_info()->network_address.port == 0) {
+        sleep(5);
+        continue;
+    }
+    std::string path = _exec_env->storage_engine()->auto_batch_load_dir();
+    std::vector<string> db_ids;
+    // TODO should we abort if list directory failed or recovery wal failed
+    Status st = FileUtils::list_files(Env::Default(), path, &db_ids);
+    // map of {table_id, {wal_id, wal_path}}
+    std::map<int64_t, std::map<int64_t, std::string>> table_wal_map;
+    for (const auto& db_id_str : db_ids) {
+        int64_t db_id = atoi64(db_id_str);
+        std::vector<string> table_ids;
+        auto db_path = path + "/" + db_id_str;
+        st = FileUtils::list_files(Env::Default(), db_path, &table_ids);
+        for (const auto& table_id_str : table_ids) {
+            int64_t table_id = atoi64(table_id_str);
+            std::vector<string> wals;
+            auto table_path = db_path + "/" + table_id_str;
+            st = FileUtils::list_files(Env::Default(), table_path, &wals);
+            for (const auto& wal : wals) {
+                int64_t wal_id = atoi64(wal);
+                auto wal_path = table_path + "/" + wal;
+                if (!_table_map.count(table_id)) {
+                    std::lock_guard<std::mutex> lock(_lock);
+                    if (!_table_map.count(table_id)) {
+                        _table_map.emplace(table_id, std::make_shared<AutoBatchLoadTable>(
+                                                             _exec_env, db_id, table_id));
+                    }
+                }
+                table_wal_map[table_id][wal_id] = wal_path;
+            }
+        }
+    }
+    for (const auto& [table_id, wals] : table_wal_map) {
+        for (const auto& [wal_id, wal_path] : wals) {
+            auto it = _table_map.find(table_id);
+            LOG(INFO) << "Start recovery wal: " << wal_path;
+            st = it->second->recovery_wal(wal_id, wal_path);
+            if (st.ok()) {
+                LOG(INFO) << "Finish recovery wal: " << wal_path;
+            } else {
+                LOG(WARNING) << "Failed recovery wal: " << wal_path

Review Comment:
   Is it ok that we do nothing if recovery failed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org