You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/04 04:51:39 UTC

[GitHub] [incubator-doris] lingbin opened a new pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

lingbin opened a new pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831
 
 
   In `AgentServer`, each task type needs to be processed separately,
   which leads to very long code, hard to read, and not easy to detect
   errors (for example, some task type processing may be missed,
   corresponding relationship may be error)
   
   Fortunately, the code for each task_type is very similar, so this
   is a good case to use `MACRO`, which can greatly reduce the repeated
   code and solve above problems.
   
   This patch also fix two small bugs:
   1. The `_topic_subscriber` member has not been released in dtor
   2. in `submit_tasks()`, the `status_code` is not reset before
      each task is processed, resulting in wrong judgment.
   
   No functional changes in this patch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374596432
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
 
 Review comment:
   yes, I will fix it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374600389
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
 
 Review comment:
   so If something is wrong, Didn't FE resend requests?
   
   An ideal solution should be: each task in a batch has its own status, rather than each batch shares a status, but this is something that can be improved in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374597645
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
         }
     }
 
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&agent_result.status);
 }
 
-void AgentServer::make_snapshot(TAgentResult& return_value,
-        const TSnapshotRequest& snapshot_request) {
-    TStatus status;
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-    return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
+void AgentServer::make_snapshot(TAgentResult& t_agent_result,
+                                const TSnapshotRequest& snapshot_request) {
+    Status ret_st;
     string snapshot_path;
-    OLAPStatus make_snapshot_status =
+    OLAPStatus err_code =
             SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
-    if (make_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: %ld, status: %d",
-                         snapshot_request.tablet_id, snapshot_request.schema_hash,
-                         make_snapshot_status);
-        error_msgs.push_back("make_snapshot failed. status: " +
-                             boost::lexical_cast<string>(make_snapshot_status));
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                     << ", schema_hash=" << snapshot_request.schema_hash
+                     << ", error_code=" << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to make_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "make_snapshot success. tablet_id: " << snapshot_request.tablet_id
-                  << " schema_hash: " << snapshot_request.schema_hash << " snapshot_path: " << snapshot_path;
-        return_value.__set_snapshot_path(snapshot_path);
+        LOG(INFO) << "success to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                  << ", schema_hash=" << snapshot_request.schema_hash
+                  << ", snapshot_path: " << snapshot_path;
+        t_agent_result.__set_snapshot_path(snapshot_path);
     }
 
-    status.__set_error_msgs(error_msgs);
-    status.__set_status_code(status_code);
-    return_value.__set_status(status);
+    ret_st.to_thrift(&t_agent_result.status);
+    t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
     if (snapshot_request.__isset.allow_incremental_clone) {
-        return_value.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
+        t_agent_result.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
     }
 }
 
-void AgentServer::release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) {
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-
-    OLAPStatus release_snapshot_status =
-            SnapshotManager::instance()->release_snapshot(snapshot_path);
-    if (release_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        LOG(WARNING) << "release_snapshot failed. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
-        error_msgs.push_back("release_snapshot failed. status: " +
-                             boost::lexical_cast<string>(release_snapshot_status));
+void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::string& snapshot_path) {
+    Status ret_st;
+    OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path);
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path
+                     << ", err_code: " << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to release_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "release_snapshot success. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
+        LOG(INFO) << "success to release_snapshot. snapshot_path=" << snapshot_path
+                  << ", err_code=" << err_code;
     }
-
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::publish_cluster_state(TAgentResult& _return, const TAgentPublishRequest& request) {
-    vector<string> error_msgs;
+// TODO(lingbin): always return OK?
+void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374610078
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
 
 Review comment:
   https://github.com/apache/incubator-doris/blob/a27e89065b692acd72ef2a1a5d616621923a3da3/fe/src/main/java/org/apache/doris/task/AgentBatchTask.java#L164-L168
   
   Oh, at present, Fe does not check the return value of submit_tasks().
   
   But there should be no problem here because this is just to check the mapping relationship between `task_type` and `req member`, which is almost unmistakable. We can wait until a separate status for each task was added in the future.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374600389
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
 
 Review comment:
   If something is wrong, Didn't FE resend requests?
   
   An ideal solution should be: each task in a batch has its own status, rather than each batch shares a status, but this is something that can be improved in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374579963
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
 
 Review comment:
   Missing `else`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374583286
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
         }
     }
 
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&agent_result.status);
 }
 
-void AgentServer::make_snapshot(TAgentResult& return_value,
-        const TSnapshotRequest& snapshot_request) {
-    TStatus status;
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-    return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
+void AgentServer::make_snapshot(TAgentResult& t_agent_result,
+                                const TSnapshotRequest& snapshot_request) {
+    Status ret_st;
     string snapshot_path;
-    OLAPStatus make_snapshot_status =
+    OLAPStatus err_code =
             SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
-    if (make_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: %ld, status: %d",
-                         snapshot_request.tablet_id, snapshot_request.schema_hash,
-                         make_snapshot_status);
-        error_msgs.push_back("make_snapshot failed. status: " +
-                             boost::lexical_cast<string>(make_snapshot_status));
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                     << ", schema_hash=" << snapshot_request.schema_hash
+                     << ", error_code=" << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to make_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "make_snapshot success. tablet_id: " << snapshot_request.tablet_id
-                  << " schema_hash: " << snapshot_request.schema_hash << " snapshot_path: " << snapshot_path;
-        return_value.__set_snapshot_path(snapshot_path);
+        LOG(INFO) << "success to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                  << ", schema_hash=" << snapshot_request.schema_hash
+                  << ", snapshot_path: " << snapshot_path;
+        t_agent_result.__set_snapshot_path(snapshot_path);
     }
 
-    status.__set_error_msgs(error_msgs);
-    status.__set_status_code(status_code);
-    return_value.__set_status(status);
+    ret_st.to_thrift(&t_agent_result.status);
+    t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
     if (snapshot_request.__isset.allow_incremental_clone) {
-        return_value.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
+        t_agent_result.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
     }
 }
 
-void AgentServer::release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) {
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-
-    OLAPStatus release_snapshot_status =
-            SnapshotManager::instance()->release_snapshot(snapshot_path);
-    if (release_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        LOG(WARNING) << "release_snapshot failed. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
-        error_msgs.push_back("release_snapshot failed. status: " +
-                             boost::lexical_cast<string>(release_snapshot_status));
+void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::string& snapshot_path) {
+    Status ret_st;
+    OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path);
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path
+                     << ", err_code: " << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to release_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "release_snapshot success. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
+        LOG(INFO) << "success to release_snapshot. snapshot_path=" << snapshot_path
+                  << ", err_code=" << err_code;
     }
-
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::publish_cluster_state(TAgentResult& _return, const TAgentPublishRequest& request) {
-    vector<string> error_msgs;
+// TODO(lingbin): always return OK?
+void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
 
 Review comment:
   The following 4 methods are useless now. I think you can just make it return ERROR, and delete them after.
   ```
   publish_cluster_state
   submit_etl_task
   get_etl_status
   delete_etl_files
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374581038
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
 
 Review comment:
   `break` here means some of the tasks are successfully submitted, and rest of them are not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374593077
 
 

 ##########
 File path: be/src/service/doris_main.cpp
 ##########
 @@ -50,6 +50,7 @@
 #include "agent/topic_subscriber.h"
 #include "util/doris_metrics.h"
 #include "olap/options.h"
+#include "olap/storage_engine.h"
 
 Review comment:
   Yes. `StorageEngine` is used in this file. 
   Previously, because of some unneeded header file included in `agent_server.h` will include `storage_engine.h`, these unneeded header files were removed from `agent_server.h`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374576859
 
 

 ##########
 File path: be/src/service/doris_main.cpp
 ##########
 @@ -50,6 +50,7 @@
 #include "agent/topic_subscriber.h"
 #include "util/doris_metrics.h"
 #include "olap/options.h"
+#include "olap/storage_engine.h"
 
 Review comment:
   Is it necessary here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r374624378
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
 
 Review comment:
   Anyway, I removed the `break`, and add some comments and TODO.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable
URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r375334190
 
 

 ##########
 File path: be/src/agent/agent_server.cpp
 ##########
 @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
     LOG(INFO) << "Register user resource listener";
     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                     \
+    case t_task_type:                                                       \
+        if (task.__isset.req_member) {                                      \
+            work_pool->submit_task(task);                                   \
+        }                                                                   \
+        ret_st = Status::InvalidArgument(strings::Substitute(               \
+                "task(signature=$0) has wrong request member", signature)); \
+        break;
+
+        // TODO(lingbin): divided these task types into several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
+                break;
             }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
+                    << ", task: " << task;
+            break;
         }
     }
 
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&agent_result.status);
 }
 
-void AgentServer::make_snapshot(TAgentResult& return_value,
-        const TSnapshotRequest& snapshot_request) {
-    TStatus status;
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-    return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
+void AgentServer::make_snapshot(TAgentResult& t_agent_result,
+                                const TSnapshotRequest& snapshot_request) {
+    Status ret_st;
     string snapshot_path;
-    OLAPStatus make_snapshot_status =
+    OLAPStatus err_code =
             SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
-    if (make_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: %ld, status: %d",
-                         snapshot_request.tablet_id, snapshot_request.schema_hash,
-                         make_snapshot_status);
-        error_msgs.push_back("make_snapshot failed. status: " +
-                             boost::lexical_cast<string>(make_snapshot_status));
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                     << ", schema_hash=" << snapshot_request.schema_hash
+                     << ", error_code=" << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to make_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "make_snapshot success. tablet_id: " << snapshot_request.tablet_id
-                  << " schema_hash: " << snapshot_request.schema_hash << " snapshot_path: " << snapshot_path;
-        return_value.__set_snapshot_path(snapshot_path);
+        LOG(INFO) << "success to make_snapshot. tablet_id=" << snapshot_request.tablet_id
+                  << ", schema_hash=" << snapshot_request.schema_hash
+                  << ", snapshot_path: " << snapshot_path;
+        t_agent_result.__set_snapshot_path(snapshot_path);
     }
 
-    status.__set_error_msgs(error_msgs);
-    status.__set_status_code(status_code);
-    return_value.__set_status(status);
+    ret_st.to_thrift(&t_agent_result.status);
+    t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
     if (snapshot_request.__isset.allow_incremental_clone) {
-        return_value.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
+        t_agent_result.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
     }
 }
 
-void AgentServer::release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) {
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-
-    OLAPStatus release_snapshot_status =
-            SnapshotManager::instance()->release_snapshot(snapshot_path);
-    if (release_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        LOG(WARNING) << "release_snapshot failed. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
-        error_msgs.push_back("release_snapshot failed. status: " +
-                             boost::lexical_cast<string>(release_snapshot_status));
+void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::string& snapshot_path) {
+    Status ret_st;
+    OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path);
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path
+                     << ", err_code: " << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to release_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "release_snapshot success. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
+        LOG(INFO) << "success to release_snapshot. snapshot_path=" << snapshot_path
+                  << ", err_code=" << err_code;
     }
-
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::publish_cluster_state(TAgentResult& _return, const TAgentPublishRequest& request) {
-    vector<string> error_msgs;
+// TODO(lingbin): always return OK?
+void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
 
 Review comment:
   Unfortunately, `Multi Load` will use these `xxx_etl_xxx()` methods, so they will be kept.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org