You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/17 02:18:22 UTC

[incubator-doris] branch master updated: mark the load job fail when more than a half of replica write failed of a tablet, (#7126)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc5ba8  mark the load job fail when more than a half of replica write failed of a tablet, (#7126)
4bc5ba8 is described below

commit 4bc5ba8819d107445410437b20a8246a3732fceb
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Nov 17 10:18:04 2021 +0800

    mark the load job fail when more than a half of replica write failed of a tablet, (#7126)
    
    the code before is counting all replica has more than a half write failed.
---
 be/src/exec/hdfs_file_reader.cpp        | 28 ++++++++++++++++++----------
 be/src/exec/tablet_sink.cpp             | 10 ++++++++--
 be/src/exec/tablet_sink.h               | 24 ++++++++++++++++++------
 be/test/olap/generic_iterators_test.cpp |  6 ++++--
 4 files changed, 48 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index 0ba1a3a..4399e63 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -81,8 +81,10 @@ Status HdfsFileReader::open() {
     _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
     if (_hdfs_file == nullptr) {
         std::stringstream ss;
-        ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
-                << _namenode << _path << ", err: " << strerror(errno);;
+        ss << "open file failed. "
+           << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+           << ", err: " << strerror(errno);
+        ;
         return Status::InternalError(ss.str());
     }
     LOG(INFO) << "open file. " << _namenode << _path;
@@ -141,8 +143,10 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
         int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position);
         if (ret != 0) { // check fseek return value
             std::stringstream ss;
-            ss << "hdfsSeek failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
-                    << _namenode << _path << ", err: " << strerror(errno);;
+            ss << "hdfsSeek failed. "
+               << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+               << ", err: " << strerror(errno);
+            ;
             return Status::InternalError(ss.str());
         }
     }
@@ -150,8 +154,10 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
     *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
     if (*bytes_read < 0) {
         std::stringstream ss;
-        ss << "Read hdfs file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
-                << _namenode << _path << ", err: " << strerror(errno);;
+        ss << "Read hdfs file failed. "
+           << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+           << ", err: " << strerror(errno);
+        ;
         return Status::InternalError(ss.str());
     }
     _current_offset += *bytes_read; // save offset with file
@@ -169,7 +175,9 @@ int64_t HdfsFileReader::size() {
         }
         hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
         if (file_info == nullptr) {
-            LOG(WARNING) << "get path info failed: " << _namenode << _path << ", err: " << strerror(errno);;
+            LOG(WARNING) << "get path info failed: " << _namenode << _path
+                         << ", err: " << strerror(errno);
+            ;
             close();
             return -1;
         }
@@ -185,10 +193,10 @@ int64_t HdfsFileReader::size() {
 Status HdfsFileReader::seek(int64_t position) {
     int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
     if (res != 0) {
-        char err_buf[64];
         std::stringstream ss;
-        ss << "Seek to offset failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
-                << " offset=" << position << ", err: " << strerror(errno);
+        ss << "Seek to offset failed. "
+           << "(BE: " << BackendOptions::get_localhost() << ")"
+           << " offset=" << position << ", err: " << strerror(errno);
         return Status::InternalError(ss.str());
     }
     return Status::OK();
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 0ee71c5..3e2db46 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -62,7 +62,7 @@ NodeChannel::~NodeChannel() {
 // returned directly via "TabletSink::prepare()" method.
 Status NodeChannel::init(RuntimeState* state) {
     _tuple_desc = _parent->_output_tuple_desc;
-    auto node =  _parent->_nodes_info->find_node(_node_id);
+    auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         std::stringstream ss;
         ss << "unknown node id, id=" << _node_id;
@@ -462,6 +462,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
             }
             channel->add_tablet(tablet);
             channels.push_back(channel);
+            _tablets_by_channel[node_id].insert(tablet.tablet_id);
         }
         _channels_by_tablet.emplace(tablet.tablet_id, std::move(channels));
     }
@@ -495,7 +496,12 @@ Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
 }
 
 bool IndexChannel::has_intolerable_failure() {
-    return _failed_channels.size() >= ((_parent->_num_replicas + 1) / 2);
+    for (const auto& it : _failed_channels) {
+        if (it.second.size() >= ((_parent->_num_replicas + 1) / 2)) {
+            return true;
+        }
+    }
+    return false;
 }
 
 OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index f973221..261912d 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -18,11 +18,12 @@
 #pragma once
 
 #include <fmt/format.h>
+
 #include <memory>
 #include <queue>
-#include <set>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -203,8 +204,9 @@ public:
         // FIXME(cmy): There is a problem that when calling node_info, the node_info seems not initialized.
         //             But I don't know why. so here I print node_info->id instead of node_info->host
         //             to avoid BE crash. It needs further observation.
-        return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.id, _node_info.brpc_port);
-    } 
+        return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.id,
+                           _node_info.brpc_port);
+    }
 
 private:
     void _cancel_with_msg(const std::string& msg);
@@ -281,7 +283,15 @@ public:
         }
     }
 
-    void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); }
+    void mark_as_failed(const NodeChannel* ch) {
+        const auto& it = _tablets_by_channel.find(ch->node_id());
+        if (it == _tablets_by_channel.end()) {
+            return;
+        }
+        for (const auto tablet_id : it->second) {
+            _failed_channels[tablet_id].insert(ch->node_id());
+        }
+    }
     bool has_intolerable_failure();
 
     size_t num_node_channels() const { return _node_channels.size(); }
@@ -295,8 +305,10 @@ private:
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
-    // BeId
-    std::set<int64_t> _failed_channels;
+    // from backend channel to tablet_id
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;
+    // key is tablet_id, value is a set of failed node id
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
 };
 
 // Write data to Olap Table.
diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp
index 072d589..91d4944 100644
--- a/be/test/olap/generic_iterators_test.cpp
+++ b/be/test/olap/generic_iterators_test.cpp
@@ -83,7 +83,8 @@ TEST(GenericIteratorsTest, Union) {
     inputs.push_back(new_auto_increment_iterator(schema, 200));
     inputs.push_back(new_auto_increment_iterator(schema, 300));
 
-    auto iter = new_union_iterator(std::move(inputs), MemTracker::CreateTracker(-1, "UnionIterator", nullptr, false));
+    auto iter = new_union_iterator(std::move(inputs),
+                                   MemTracker::CreateTracker(-1, "UnionIterator", nullptr, false));
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());
@@ -122,7 +123,8 @@ TEST(GenericIteratorsTest, Merge) {
     inputs.push_back(new_auto_increment_iterator(schema, 200));
     inputs.push_back(new_auto_increment_iterator(schema, 300));
 
-    auto iter = new_merge_iterator(std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false));
+    auto iter = new_merge_iterator(
+            std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org