You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/17 02:18:22 UTC
[incubator-doris] branch master updated: mark the load job fail when more than a half of replica write failed of a tablet, (#7126)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc5ba8 mark the load job fail when more than a half of replica write failed of a tablet, (#7126)
4bc5ba8 is described below
commit 4bc5ba8819d107445410437b20a8246a3732fceb
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Nov 17 10:18:04 2021 +0800
mark the load job fail when more than a half of replica write failed of a tablet, (#7126)
the code before is counting all replica has more than a half write failed.
---
be/src/exec/hdfs_file_reader.cpp | 28 ++++++++++++++++++----------
be/src/exec/tablet_sink.cpp | 10 ++++++++--
be/src/exec/tablet_sink.h | 24 ++++++++++++++++++------
be/test/olap/generic_iterators_test.cpp | 6 ++++--
4 files changed, 48 insertions(+), 20 deletions(-)
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index 0ba1a3a..4399e63 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -81,8 +81,10 @@ Status HdfsFileReader::open() {
_hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
if (_hdfs_file == nullptr) {
std::stringstream ss;
- ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
- << _namenode << _path << ", err: " << strerror(errno);;
+ ss << "open file failed. "
+ << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+ << ", err: " << strerror(errno);
+ ;
return Status::InternalError(ss.str());
}
LOG(INFO) << "open file. " << _namenode << _path;
@@ -141,8 +143,10 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (ret != 0) { // check fseek return value
std::stringstream ss;
- ss << "hdfsSeek failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
- << _namenode << _path << ", err: " << strerror(errno);;
+ ss << "hdfsSeek failed. "
+ << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+ << ", err: " << strerror(errno);
+ ;
return Status::InternalError(ss.str());
}
}
@@ -150,8 +154,10 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
*bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
if (*bytes_read < 0) {
std::stringstream ss;
- ss << "Read hdfs file failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
- << _namenode << _path << ", err: " << strerror(errno);;
+ ss << "Read hdfs file failed. "
+ << "(BE: " << BackendOptions::get_localhost() << ")" << _namenode << _path
+ << ", err: " << strerror(errno);
+ ;
return Status::InternalError(ss.str());
}
_current_offset += *bytes_read; // save offset with file
@@ -169,7 +175,9 @@ int64_t HdfsFileReader::size() {
}
hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
if (file_info == nullptr) {
- LOG(WARNING) << "get path info failed: " << _namenode << _path << ", err: " << strerror(errno);;
+ LOG(WARNING) << "get path info failed: " << _namenode << _path
+ << ", err: " << strerror(errno);
+ ;
close();
return -1;
}
@@ -185,10 +193,10 @@ int64_t HdfsFileReader::size() {
Status HdfsFileReader::seek(int64_t position) {
int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (res != 0) {
- char err_buf[64];
std::stringstream ss;
- ss << "Seek to offset failed. " << "(BE: " << BackendOptions::get_localhost() << ")"
- << " offset=" << position << ", err: " << strerror(errno);
+ ss << "Seek to offset failed. "
+ << "(BE: " << BackendOptions::get_localhost() << ")"
+ << " offset=" << position << ", err: " << strerror(errno);
return Status::InternalError(ss.str());
}
return Status::OK();
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 0ee71c5..3e2db46 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -62,7 +62,7 @@ NodeChannel::~NodeChannel() {
// returned directly via "TabletSink::prepare()" method.
Status NodeChannel::init(RuntimeState* state) {
_tuple_desc = _parent->_output_tuple_desc;
- auto node = _parent->_nodes_info->find_node(_node_id);
+ auto node = _parent->_nodes_info->find_node(_node_id);
if (node == nullptr) {
std::stringstream ss;
ss << "unknown node id, id=" << _node_id;
@@ -462,6 +462,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
}
channel->add_tablet(tablet);
channels.push_back(channel);
+ _tablets_by_channel[node_id].insert(tablet.tablet_id);
}
_channels_by_tablet.emplace(tablet.tablet_id, std::move(channels));
}
@@ -495,7 +496,12 @@ Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
}
bool IndexChannel::has_intolerable_failure() {
- return _failed_channels.size() >= ((_parent->_num_replicas + 1) / 2);
+ for (const auto& it : _failed_channels) {
+ if (it.second.size() >= ((_parent->_num_replicas + 1) / 2)) {
+ return true;
+ }
+ }
+ return false;
}
OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index f973221..261912d 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -18,11 +18,12 @@
#pragma once
#include <fmt/format.h>
+
#include <memory>
#include <queue>
-#include <set>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -203,8 +204,9 @@ public:
// FIXME(cmy): There is a problem that when calling node_info, the node_info seems not initialized.
// But I don't know why. so here I print node_info->id instead of node_info->host
// to avoid BE crash. It needs further observation.
- return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.id, _node_info.brpc_port);
- }
+ return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.id,
+ _node_info.brpc_port);
+ }
private:
void _cancel_with_msg(const std::string& msg);
@@ -281,7 +283,15 @@ public:
}
}
- void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); }
+ void mark_as_failed(const NodeChannel* ch) {
+ const auto& it = _tablets_by_channel.find(ch->node_id());
+ if (it == _tablets_by_channel.end()) {
+ return;
+ }
+ for (const auto tablet_id : it->second) {
+ _failed_channels[tablet_id].insert(ch->node_id());
+ }
+ }
bool has_intolerable_failure();
size_t num_node_channels() const { return _node_channels.size(); }
@@ -295,8 +305,10 @@ private:
std::unordered_map<int64_t, NodeChannel*> _node_channels;
// from tablet_id to backend channel
std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
- // BeId
- std::set<int64_t> _failed_channels;
+ // from backend channel to tablet_id
+ std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;
+ // key is tablet_id, value is a set of failed node id
+ std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
};
// Write data to Olap Table.
diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp
index 072d589..91d4944 100644
--- a/be/test/olap/generic_iterators_test.cpp
+++ b/be/test/olap/generic_iterators_test.cpp
@@ -83,7 +83,8 @@ TEST(GenericIteratorsTest, Union) {
inputs.push_back(new_auto_increment_iterator(schema, 200));
inputs.push_back(new_auto_increment_iterator(schema, 300));
- auto iter = new_union_iterator(std::move(inputs), MemTracker::CreateTracker(-1, "UnionIterator", nullptr, false));
+ auto iter = new_union_iterator(std::move(inputs),
+ MemTracker::CreateTracker(-1, "UnionIterator", nullptr, false));
StorageReadOptions opts;
auto st = iter->init(opts);
ASSERT_TRUE(st.ok());
@@ -122,7 +123,8 @@ TEST(GenericIteratorsTest, Merge) {
inputs.push_back(new_auto_increment_iterator(schema, 200));
inputs.push_back(new_auto_increment_iterator(schema, 300));
- auto iter = new_merge_iterator(std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false));
+ auto iter = new_merge_iterator(
+ std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1);
StorageReadOptions opts;
auto st = iter->init(opts);
ASSERT_TRUE(st.ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org