You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/08 03:18:05 UTC

[GitHub] [incubator-doris] yixiutt opened a new pull request, #10007: [Bugfix](load) fix replica unhealthy bug in concurrent stream load

yixiutt opened a new pull request, #10007:
URL: https://github.com/apache/incubator-doris/pull/10007

   in concurrent stream load, fe will run publish version task concurrently,
   which cause publish task disorder in be.
   For example:
   fe publish task with version 1 2 3 4
   be may handle task with sequence 1 2 4 3
   In case above, when report tablet info, be found that version 4
   published but version 3 not visable, it'll report version miss to fe,
   and fe will set replica lastFailedVersion, and finally makes transaction
   commits fail while no quorum health replicas。
   
   Add a strategy to handle this case, be must publish version in sequence,
   if publish task version not match tablet max_version, it will be push
   back to queue and wait.
   
   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#issuecomment-1149534175

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] zhannngchen commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
zhannngchen commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891915838


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -673,38 +673,50 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
         TPublishVersionRequest publish_version_req;
-        {
-            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
-            if (!_is_work) {
-                return;
-            }
+        bool current_finished = false;
+        std::vector<TTabletId> error_tablet_ids;
+        Status res = Status::OK();
+        while (!current_finished) {
+            {
+                std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+                while (_is_work && _tasks.empty()) {
+                    _worker_thread_condition_variable.wait(worker_thread_lock);
+                }
+                if (!_is_work) {
+                    return;
+                }
 
-            agent_task_req = _tasks.front();
-            publish_version_req = agent_task_req.publish_version_req;
-            _tasks.pop_front();
-        }
+                agent_task_req = _tasks.front();
+                publish_version_req = agent_task_req.publish_version_req;
+                _tasks.pop_front();
+            }
 
-        DorisMetrics::instance()->publish_task_request_total->increment(1);
-        VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
+            DorisMetrics::instance()->publish_task_request_total->increment(1);
+            VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
 
-        std::vector<TTabletId> error_tablet_ids;
-        uint32_t retry_time = 0;
-        Status res = Status::OK();
-        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
-            error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
-            res = _env->storage_engine()->execute_task(&engine_task);
-            if (res.ok()) {
-                break;
-            } else {
-                LOG(WARNING) << "publish version error, retry. [transaction_id="
-                             << publish_version_req.transaction_id
-                             << ", error_tablets_size=" << error_tablet_ids.size() << "]";
-                ++retry_time;
-                std::this_thread::sleep_for(std::chrono::seconds(1));
+            uint32_t retry_time = 0;
+            while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+                error_tablet_ids.clear();
+                EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
+                res = _env->storage_engine()->execute_task(&engine_task);
+                if (res.ok()) {
+                    current_finished = true;
+                    break;
+                } else if (res.precise_code() == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+                    std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);

Review Comment:
   Add comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yixiutt commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
yixiutt commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891948602


##########
be/src/olap/task/engine_publish_version_task.cpp:
##########
@@ -83,6 +83,12 @@ Status EnginePublishVersionTask::finish() {
                 res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST);
                 continue;
             }
+            {
+                std::shared_lock rdlock(tablet->get_header_lock());
+                if (tablet->max_version().second + 1 != par_ver_info.version) {
+                    return Status::OLAPInternalError(OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS);

Review Comment:
   we should retrun OLAPInternalError here so add a new error type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r893158734


##########
be/src/olap/tablet.cpp:
##########
@@ -1290,7 +1295,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
     Version max_version;
     bool has_version_cross;
     _max_continuous_version_from_beginning_unlocked(&cversion, &max_version, &has_version_cross);
-    tablet_info->__set_version_miss(cversion.second < max_version.second);
+    if (enable_consective_missing_check) {
+        if (cversion.second < max_version.second) {
+            if (_last_missed_version == cversion.second + 1) {
+                if (_last_missed_time_s - MonotonicSeconds() >= 60) {
+                    // version missed for over 60 seconds
+                    tablet_info->__set_version_miss(true);
+                    _last_missed_version = -1;
+                    _last_missed_time_s = 0;
+                }
+            } else {
+                _last_missed_version = cversion.second + 1;
+                _last_missed_time_s = MonotonicSeconds();
+            }
+        } else {

Review Comment:
   This `else` block should be with `if (enable_consective_missing_check) {`



##########
be/src/olap/tablet.cpp:
##########
@@ -1277,7 +1279,10 @@ bool Tablet::_contains_rowset(const RowsetId rowset_id) {
     return false;
 }
 
-void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
+// need check if consective version missing in full report
+// alter tablet will ignore this check
+void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
+                                      bool enable_consective_missing_check) {

Review Comment:
   ```suggestion
                                         bool enable_consecutive_missing_check) {
   ```



##########
be/src/olap/tablet.cpp:
##########
@@ -1290,7 +1295,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
     Version max_version;
     bool has_version_cross;
     _max_continuous_version_from_beginning_unlocked(&cversion, &max_version, &has_version_cross);
-    tablet_info->__set_version_miss(cversion.second < max_version.second);
+    if (enable_consective_missing_check) {
+        if (cversion.second < max_version.second) {
+            if (_last_missed_version == cversion.second + 1) {
+                if (_last_missed_time_s - MonotonicSeconds() >= 60) {
+                    // version missed for over 60 seconds
+                    tablet_info->__set_version_miss(true);
+                    _last_missed_version = -1;
+                    _last_missed_time_s = 0;
+                }
+            } else {
+                _last_missed_version = cversion.second + 1;
+                _last_missed_time_s = MonotonicSeconds();
+            }
+        } else {

Review Comment:
   And please add comment here to explain why we need this logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] zhannngchen commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
zhannngchen commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891916251


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -673,38 +673,50 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
         TPublishVersionRequest publish_version_req;
-        {
-            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
-            if (!_is_work) {
-                return;
-            }
+        bool current_finished = false;
+        std::vector<TTabletId> error_tablet_ids;
+        Status res = Status::OK();
+        while (!current_finished) {
+            {
+                std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+                while (_is_work && _tasks.empty()) {
+                    _worker_thread_condition_variable.wait(worker_thread_lock);
+                }
+                if (!_is_work) {
+                    return;
+                }
 
-            agent_task_req = _tasks.front();
-            publish_version_req = agent_task_req.publish_version_req;
-            _tasks.pop_front();
-        }
+                agent_task_req = _tasks.front();
+                publish_version_req = agent_task_req.publish_version_req;
+                _tasks.pop_front();
+            }
 
-        DorisMetrics::instance()->publish_task_request_total->increment(1);
-        VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
+            DorisMetrics::instance()->publish_task_request_total->increment(1);
+            VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
 
-        std::vector<TTabletId> error_tablet_ids;
-        uint32_t retry_time = 0;
-        Status res = Status::OK();
-        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
-            error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
-            res = _env->storage_engine()->execute_task(&engine_task);
-            if (res.ok()) {
-                break;
-            } else {
-                LOG(WARNING) << "publish version error, retry. [transaction_id="
-                             << publish_version_req.transaction_id
-                             << ", error_tablets_size=" << error_tablet_ids.size() << "]";
-                ++retry_time;
-                std::this_thread::sleep_for(std::chrono::seconds(1));
+            uint32_t retry_time = 0;
+            while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+                error_tablet_ids.clear();
+                EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
+                res = _env->storage_engine()->execute_task(&engine_task);
+                if (res.ok()) {
+                    current_finished = true;
+                    break;
+                } else if (res.precise_code() == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+                    std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+                    _tasks.push_back(agent_task_req);

Review Comment:
   we should also add a log here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] dataroaring commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891894033


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -673,38 +673,50 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
         TPublishVersionRequest publish_version_req;
-        {
-            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
-            if (!_is_work) {
-                return;
-            }
+        bool current_finished = false;
+        std::vector<TTabletId> error_tablet_ids;
+        Status res = Status::OK();
+        while (!current_finished) {
+            {

Review Comment:
   Could we remove this while?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] dataroaring commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891895567


##########
be/src/olap/task/engine_publish_version_task.cpp:
##########
@@ -83,6 +83,12 @@ Status EnginePublishVersionTask::finish() {
                 res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST);
                 continue;
             }
+            {
+                std::shared_lock rdlock(tablet->get_header_lock());
+                if (tablet->max_version().second + 1 != par_ver_info.version) {
+                    return Status::OLAPInternalError(OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS);

Review Comment:
   Could we  use MISSING_VERSION error here?



##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -707,6 +708,14 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
                 std::this_thread::sleep_for(std::chrono::seconds(1));
             }
         }
+        if (res == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+            // push back to task queue if publish version not continuous
+            // make sure publish version task run in order
+            _tasks.push_back(agent_task_req);
+            _worker_thread_condition_variable.notify_one();

Review Comment:
   This thread is working, so we donot need notify here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#issuecomment-1149534145

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
morningman merged PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yixiutt commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
yixiutt commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891950832


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -707,6 +708,14 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
                 std::this_thread::sleep_for(std::chrono::seconds(1));
             }
         }
+        if (res == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+            // push back to task queue if publish version not continuous
+            // make sure publish version task run in order
+            _tasks.push_back(agent_task_req);
+            _worker_thread_condition_variable.notify_one();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r892021391


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -707,6 +708,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
                 std::this_thread::sleep_for(std::chrono::seconds(1));
             }
         }
+        if (res == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+            // push back to task queue if publish version not continuous
+            // make sure publish version task run in order
+            _tasks.push_back(agent_task_req);

Review Comment:
   Maybe there will be a dead loop. If current tablet is a bad tablet(it really has a missed version), you will push the task to task queue and not publish it. Clone process will failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org