You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/03/09 14:07:45 UTC

[incubator-doris] branch master updated: [Bug] Fix invalid rollback for stream load txn (#3054)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b9b9a11  [Bug] Fix invalid rollback for stream load txn (#3054)
b9b9a11 is described below

commit b9b9a11eae97249163c9d193745ee9bff2a889b6
Author: WingC <10...@qq.com>
AuthorDate: Mon Mar 9 09:07:36 2020 -0500

    [Bug] Fix invalid rollback for stream load txn (#3054)
---
 .../runtime/stream_load/stream_load_executor.cpp   | 215 +++++++++++----------
 1 file changed, 111 insertions(+), 104 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 6fc4912..14e17be 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -25,8 +25,8 @@
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/runtime_state.h"
 #include "runtime/stream_load/stream_load_context.h"
-#include "util/thrift_rpc_helper.h"
 #include "util/doris_metrics.h"
+#include "util/thrift_rpc_helper.h"
 
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/FrontendService_types.h"
@@ -44,65 +44,67 @@ Status k_stream_load_plan_status;
 
 Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
     DorisMetrics::txn_exec_plan_total.increment(1);
-    // submit this params
+// submit this params
 #ifndef BE_TEST
     ctx->ref();
-    LOG(INFO) << "begin to execute job. label=" << ctx->label
-              << ", txn_id=" << ctx->txn_id
+    LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
               << ", query_id=" << print_id(ctx->put_result.params.params.query_id);
     auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
-        ctx->put_result.params,
-        [ctx] (PlanFragmentExecutor* executor) {
-            ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
-            Status status = executor->status();
-            if (status.ok()) {
-                ctx->number_total_rows = executor->runtime_state()->num_rows_load_total();
-                ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success();
-                ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered();
-                ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected();
+            ctx->put_result.params, [ctx](PlanFragmentExecutor* executor) {
+                ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
+                Status status = executor->status();
+                if (status.ok()) {
+                    ctx->number_total_rows = executor->runtime_state()->num_rows_load_total();
+                    ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success();
+                    ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered();
+                    ctx->number_unselected_rows =
+                            executor->runtime_state()->num_rows_load_unselected();
 
-                int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
-                if ((double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
-                    // NOTE: Do not modify the error message here, for historical reasons,
-                    // some users may rely on this error message.
-                    status = Status::InternalError("too many filtered rows");
-                } else if(ctx->number_loaded_rows == 0){
-                    status = Status::InternalError("all partitions have no load data");
-                }
-                if (ctx->number_filtered_rows > 0 &&
-                    !executor->runtime_state()->get_error_log_file_path().empty()) {
-                    ctx->error_url = to_load_error_http_path(
-                            executor->runtime_state()->get_error_log_file_path());
-                }
+                    int64_t num_selected_rows =
+                            ctx->number_total_rows - ctx->number_unselected_rows;
+                    if ((double)ctx->number_filtered_rows / num_selected_rows >
+                        ctx->max_filter_ratio) {
+                        // NOTE: Do not modify the error message here, for historical
+                        // reasons,
+                        // some users may rely on this error message.
+                        status = Status::InternalError("too many filtered rows");
+                    } else if (ctx->number_loaded_rows == 0) {
+                        status = Status::InternalError("all partitions have no load data");
+                    }
+                    if (ctx->number_filtered_rows > 0 &&
+                        !executor->runtime_state()->get_error_log_file_path().empty()) {
+                        ctx->error_url = to_load_error_http_path(
+                                executor->runtime_state()->get_error_log_file_path());
+                    }
 
-                if (status.ok()) {
-                    DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes);
-                    DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows);
-                }
-            } else {
-                LOG(WARNING) << "fragment execute failed"
-                    << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
-                    << ", err_msg=" << status.get_error_msg()
-                    << ctx->brief();
-                // cancel body_sink, make sender known it
-                if (ctx->body_sink != nullptr) {
-                    ctx->body_sink->cancel();
-                }
+                    if (status.ok()) {
+                        DorisMetrics::stream_receive_bytes_total.increment(ctx->receive_bytes);
+                        DorisMetrics::stream_load_rows_total.increment(ctx->number_loaded_rows);
+                    }
+                } else {
+                    LOG(WARNING) << "fragment execute failed"
+                                 << ", query_id="
+                                 << UniqueId(ctx->put_result.params.params.query_id)
+                                 << ", err_msg=" << status.get_error_msg() << ctx->brief();
+                    // cancel body_sink, make sender known it
+                    if (ctx->body_sink != nullptr) {
+                        ctx->body_sink->cancel();
+                    }
 
-                switch(ctx->load_src_type) {
+                    switch (ctx->load_src_type) {
                     // reset the stream load ctx's kafka commit offset
                     case TLoadSourceType::KAFKA:
                         ctx->kafka_info->reset_offset();
                         break;
                     default:
                         break;
+                    }
                 }
-            }
-            ctx->promise.set_value(status);
-            if (ctx->unref()) {
-                delete ctx;
-            }
-        });
+                ctx->promise.set_value(status);
+                if (ctx->unref()) {
+                    delete ctx;
+                }
+            });
     if (!st.ok()) {
         // no need to check unref's return value
         ctx->unref();
@@ -134,7 +136,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
 #ifndef BE_TEST
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
-            [&request, &result] (FrontendServiceConnection& client) {
+            [&request, &result](FrontendServiceConnection& client) {
                 client->loadTxnBegin(result, request);
             }));
 #else
@@ -143,7 +145,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
     Status status(result.status);
     if (!status.ok()) {
         LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg()
-            << ctx->brief();
+                     << ctx->brief();
         if (result.__isset.job_status) {
             ctx->existing_job_status = result.job_status;
         }
@@ -178,19 +180,24 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
     TLoadTxnCommitResult result;
 #ifndef BE_TEST
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-        master_addr.hostname, master_addr.port,
-        [&request, &result] (FrontendServiceConnection& client) {
-            client->loadTxnCommit(result, request);
-        }, config::txn_commit_rpc_timeout_ms));
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->loadTxnCommit(result, request);
+            },
+            config::txn_commit_rpc_timeout_ms));
 #else
     result = k_stream_load_commit_result;
 #endif
-    // Return if this transaction is committed successful; otherwise, we need try to
+    // Return if this transaction is committed successful; otherwise, we need try
+    // to
     // rollback this transaction
     Status status(result.status);
     if (!status.ok()) {
         LOG(WARNING) << "commit transaction failed, errmsg=" << status.get_error_msg()
-            << ctx->brief();
+                     << ctx->brief();
+        if (status.code() == TStatusCode::PUBLISH_TIMEOUT) {
+            ctx->need_rollback = false;
+        }
         return status;
     }
     // commit success, set need_rollback to false
@@ -219,13 +226,13 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
     TLoadTxnRollbackResult result;
 #ifndef BE_TEST
     auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
-        master_addr.hostname, master_addr.port,
-        [&request, &result] (FrontendServiceConnection& client) {
-            client->loadTxnRollback(result, request);
-        });
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->loadTxnRollback(result, request);
+            });
     if (!rpc_st.ok()) {
         LOG(WARNING) << "transaction rollback failed. errmsg=" << rpc_st.get_error_msg()
-                << ctx->brief();
+                     << ctx->brief();
     }
 #else
     result = k_stream_load_rollback_result;
@@ -237,60 +244,60 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
         // currently, only routine load and mini load need to be set attachment
         return false;
     }
-    switch(ctx->load_type) {
-        case TLoadType::MINI_LOAD: {
-            attach->loadType = TLoadType::MINI_LOAD;
-
-            TMiniLoadTxnCommitAttachment ml_attach;
-            ml_attach.loadedRows = ctx->number_loaded_rows;
-            ml_attach.filteredRows = ctx->number_filtered_rows;
-            if (!ctx->error_url.empty()) {
-                ml_attach.__set_errorLogUrl(ctx->error_url);
-            }
+    switch (ctx->load_type) {
+    case TLoadType::MINI_LOAD: {
+        attach->loadType = TLoadType::MINI_LOAD;
 
-            attach->mlTxnCommitAttachment = std::move(ml_attach);
-            attach->__isset.mlTxnCommitAttachment = true;
-            break;
+        TMiniLoadTxnCommitAttachment ml_attach;
+        ml_attach.loadedRows = ctx->number_loaded_rows;
+        ml_attach.filteredRows = ctx->number_filtered_rows;
+        if (!ctx->error_url.empty()) {
+            ml_attach.__set_errorLogUrl(ctx->error_url);
         }
-        case TLoadType::ROUTINE_LOAD: {
-            attach->loadType = TLoadType::ROUTINE_LOAD;
 
-            TRLTaskTxnCommitAttachment rl_attach;
-            rl_attach.jobId = ctx->job_id;
-            rl_attach.id = ctx->id.to_thrift();
-            rl_attach.__set_loadedRows(ctx->number_loaded_rows);
-            rl_attach.__set_filteredRows(ctx->number_filtered_rows);
-            rl_attach.__set_unselectedRows(ctx->number_unselected_rows);
-            rl_attach.__set_receivedBytes(ctx->receive_bytes);
-            rl_attach.__set_loadedBytes(ctx->loaded_bytes);
-            rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000);
+        attach->mlTxnCommitAttachment = std::move(ml_attach);
+        attach->__isset.mlTxnCommitAttachment = true;
+        break;
+    }
+    case TLoadType::ROUTINE_LOAD: {
+        attach->loadType = TLoadType::ROUTINE_LOAD;
 
-            attach->rlTaskTxnCommitAttachment = std::move(rl_attach);
-            attach->__isset.rlTaskTxnCommitAttachment = true;
-            break;
-        }
-        default:
-            // unknown load type, should not happend
-            return false;
+        TRLTaskTxnCommitAttachment rl_attach;
+        rl_attach.jobId = ctx->job_id;
+        rl_attach.id = ctx->id.to_thrift();
+        rl_attach.__set_loadedRows(ctx->number_loaded_rows);
+        rl_attach.__set_filteredRows(ctx->number_filtered_rows);
+        rl_attach.__set_unselectedRows(ctx->number_unselected_rows);
+        rl_attach.__set_receivedBytes(ctx->receive_bytes);
+        rl_attach.__set_loadedBytes(ctx->loaded_bytes);
+        rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000);
+
+        attach->rlTaskTxnCommitAttachment = std::move(rl_attach);
+        attach->__isset.rlTaskTxnCommitAttachment = true;
+        break;
+    }
+    default:
+        // unknown load type, should not happend
+        return false;
     }
 
-    switch(ctx->load_src_type) {
-        case TLoadSourceType::KAFKA: {
-            TRLTaskTxnCommitAttachment &rl_attach = attach->rlTaskTxnCommitAttachment;
-            rl_attach.loadSourceType = TLoadSourceType::KAFKA;
+    switch (ctx->load_src_type) {
+    case TLoadSourceType::KAFKA: {
+        TRLTaskTxnCommitAttachment& rl_attach = attach->rlTaskTxnCommitAttachment;
+        rl_attach.loadSourceType = TLoadSourceType::KAFKA;
 
-            TKafkaRLTaskProgress kafka_progress;
-            kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset;
+        TKafkaRLTaskProgress kafka_progress;
+        kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset;
 
-            rl_attach.kafkaRLTaskProgress = std::move(kafka_progress);
-            rl_attach.__isset.kafkaRLTaskProgress = true;
-            if (!ctx->error_url.empty()) {
-                rl_attach.__set_errorLogUrl(ctx->error_url);
-            }
-            return true;
+        rl_attach.kafkaRLTaskProgress = std::move(kafka_progress);
+        rl_attach.__isset.kafkaRLTaskProgress = true;
+        if (!ctx->error_url.empty()) {
+            rl_attach.__set_errorLogUrl(ctx->error_url);
         }
-        default:
-            return true;
+        return true;
+    }
+    default:
+        return true;
     }
     return false;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org