You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/08 04:51:24 UTC

[GitHub] [incubator-doris] zhannngchen commented on a diff in pull request #10007: [Bugfix](load) fix streamload failure due to false unhealthy replica in concurrent stream load

zhannngchen commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891915838


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -673,38 +673,50 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
         TPublishVersionRequest publish_version_req;
-        {
-            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
-            if (!_is_work) {
-                return;
-            }
+        bool current_finished = false;
+        std::vector<TTabletId> error_tablet_ids;
+        Status res = Status::OK();
+        while (!current_finished) {
+            {
+                std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+                while (_is_work && _tasks.empty()) {
+                    _worker_thread_condition_variable.wait(worker_thread_lock);
+                }
+                if (!_is_work) {
+                    return;
+                }
 
-            agent_task_req = _tasks.front();
-            publish_version_req = agent_task_req.publish_version_req;
-            _tasks.pop_front();
-        }
+                agent_task_req = _tasks.front();
+                publish_version_req = agent_task_req.publish_version_req;
+                _tasks.pop_front();
+            }
 
-        DorisMetrics::instance()->publish_task_request_total->increment(1);
-        VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
+            DorisMetrics::instance()->publish_task_request_total->increment(1);
+            VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
 
-        std::vector<TTabletId> error_tablet_ids;
-        uint32_t retry_time = 0;
-        Status res = Status::OK();
-        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
-            error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
-            res = _env->storage_engine()->execute_task(&engine_task);
-            if (res.ok()) {
-                break;
-            } else {
-                LOG(WARNING) << "publish version error, retry. [transaction_id="
-                             << publish_version_req.transaction_id
-                             << ", error_tablets_size=" << error_tablet_ids.size() << "]";
-                ++retry_time;
-                std::this_thread::sleep_for(std::chrono::seconds(1));
+            uint32_t retry_time = 0;
+            while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+                error_tablet_ids.clear();
+                EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
+                res = _env->storage_engine()->execute_task(&engine_task);
+                if (res.ok()) {
+                    current_finished = true;
+                    break;
+                } else if (res.precise_code() == OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+                    std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);

Review Comment:
   Add comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org