You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/06/01 03:42:50 UTC

[doris] branch master updated: [pipeline](load) support pipeline load (#20217)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4387f47fb5 [pipeline](load) support pipeline load (#20217)
4387f47fb5 is described below

commit 4387f47fb52a9e4813e0b1551bc173d9d7186ad4
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Jun 1 11:42:43 2023 +0800

    [pipeline](load) support pipeline load (#20217)
---
 be/src/runtime/fragment_mgr.cpp                    |  36 +++-
 .../routine_load/routine_load_task_executor.cpp    |   9 +-
 .../runtime/stream_load/stream_load_executor.cpp   | 201 +++++++++++++-------
 .../doris/load/routineload/KafkaTaskInfo.java      |  17 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  21 +++
 .../apache/doris/planner/StreamLoadPlanner.java    | 202 +++++++++++++++++++++
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 114 ++++++++----
 .../apache/doris/service/FrontendServiceImpl.java  |  46 ++++-
 gensrc/thrift/BackendService.thrift                |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 11 files changed, 547 insertions(+), 102 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5ac0554104..26646bbd0e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -559,6 +559,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
         stream_load_ctx->txn_id = params.txn_conf.txn_id;
         stream_load_ctx->id = UniqueId(params.params.query_id);
         stream_load_ctx->put_result.params = params;
+        stream_load_ctx->put_result.__isset.params = true;
         stream_load_ctx->use_streaming = true;
         stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
         stream_load_ctx->load_src_type = TLoadSourceType::RAW;
@@ -586,8 +587,39 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
 }
 
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) {
-    // TODO
-    return exec_plan_fragment(params, empty_function);
+    if (params.txn_conf.need_txn) {
+        std::shared_ptr<StreamLoadContext> stream_load_ctx =
+                std::make_shared<StreamLoadContext>(_exec_env);
+        stream_load_ctx->db = params.txn_conf.db;
+        stream_load_ctx->db_id = params.txn_conf.db_id;
+        stream_load_ctx->table = params.txn_conf.tbl;
+        stream_load_ctx->txn_id = params.txn_conf.txn_id;
+        stream_load_ctx->id = UniqueId(params.query_id);
+        stream_load_ctx->put_result.pipeline_params = params;
+        stream_load_ctx->use_streaming = true;
+        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
+        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
+        stream_load_ctx->label = params.import_label;
+        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
+        stream_load_ctx->timeout_second = 3600;
+        stream_load_ctx->auth.token = params.txn_conf.token;
+        stream_load_ctx->need_commit_self = true;
+        stream_load_ctx->need_rollback = true;
+        auto pipe = std::make_shared<io::StreamLoadPipe>(
+                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
+                -1 /* total_length */, true /* use_proto */);
+        stream_load_ctx->body_sink = pipe;
+        stream_load_ctx->pipe = pipe;
+        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
+
+        RETURN_IF_ERROR(
+                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
+
+        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
+        return Status::OK();
+    } else {
+        return exec_plan_fragment(params, empty_function);
+    }
 }
 
 Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 4db6d42e18..7837c240e0 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -213,8 +213,13 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
     TStatus tstatus;
     tstatus.status_code = TStatusCode::OK;
     put_result.status = tstatus;
-    put_result.params = task.params;
-    put_result.__isset.params = true;
+    if (task.__isset.params) {
+        put_result.params = task.params;
+        put_result.__isset.params = true;
+    } else {
+        put_result.pipeline_params = task.pipeline_params;
+        put_result.__isset.pipeline_params = true;
+    }
     ctx->put_result = put_result;
     if (task.__isset.format) {
         ctx->format = task.format;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index dc03195c3e..05c68941b9 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -69,77 +69,154 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     ctx->start_write_data_nanos = MonotonicNanos();
     LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
               << ", query_id=" << print_id(ctx->put_result.params.params.query_id);
-    auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
-            ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
-                ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
-                ctx->commit_infos = std::move(state->tablet_commit_infos());
-                if (status->ok()) {
-                    ctx->number_total_rows = state->num_rows_load_total();
-                    ctx->number_loaded_rows = state->num_rows_load_success();
-                    ctx->number_filtered_rows = state->num_rows_load_filtered();
-                    ctx->number_unselected_rows = state->num_rows_load_unselected();
-
-                    int64_t num_selected_rows =
-                            ctx->number_total_rows - ctx->number_unselected_rows;
-                    if (num_selected_rows > 0 &&
-                        (double)ctx->number_filtered_rows / num_selected_rows >
-                                ctx->max_filter_ratio) {
-                        // NOTE: Do not modify the error message here, for historical reasons,
-                        // some users may rely on this error message.
-                        *status = Status::InternalError("too many filtered rows");
+    Status st;
+    if (ctx->put_result.__isset.params) {
+        st = _exec_env->fragment_mgr()->exec_plan_fragment(
+                ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
+                    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+                    ctx->commit_infos = std::move(state->tablet_commit_infos());
+                    if (status->ok()) {
+                        ctx->number_total_rows = state->num_rows_load_total();
+                        ctx->number_loaded_rows = state->num_rows_load_success();
+                        ctx->number_filtered_rows = state->num_rows_load_filtered();
+                        ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+                        int64_t num_selected_rows =
+                                ctx->number_total_rows - ctx->number_unselected_rows;
+                        if (num_selected_rows > 0 &&
+                            (double)ctx->number_filtered_rows / num_selected_rows >
+                                    ctx->max_filter_ratio) {
+                            // NOTE: Do not modify the error message here, for historical reasons,
+                            // some users may rely on this error message.
+                            *status = Status::InternalError("too many filtered rows");
+                        }
+                        if (ctx->number_filtered_rows > 0 &&
+                            !state->get_error_log_file_path().empty()) {
+                            ctx->error_url =
+                                    to_load_error_http_path(state->get_error_log_file_path());
+                        }
+
+                        if (status->ok()) {
+                            DorisMetrics::instance()->stream_receive_bytes_total->increment(
+                                    ctx->receive_bytes);
+                            DorisMetrics::instance()->stream_load_rows_total->increment(
+                                    ctx->number_loaded_rows);
+                        }
+                    } else {
+                        LOG(WARNING)
+                                << "fragment execute failed"
+                                << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
+                                << ", err_msg=" << status->to_string() << ", " << ctx->brief();
+                        // cancel body_sink, make sender known it
+                        if (ctx->body_sink != nullptr) {
+                            ctx->body_sink->cancel(status->to_string());
+                        }
+
+                        switch (ctx->load_src_type) {
+                        // reset the stream load ctx's kafka commit offset
+                        case TLoadSourceType::KAFKA:
+                            ctx->kafka_info->reset_offset();
+                            break;
+                        default:
+                            break;
+                        }
                     }
-                    if (ctx->number_filtered_rows > 0 &&
-                        !state->get_error_log_file_path().empty()) {
-                        ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
+                    ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
+                    ctx->promise.set_value(*status);
+
+                    if (!status->ok() && ctx->body_sink != nullptr) {
+                        // In some cases, the load execution is exited early.
+                        // For example, when max_filter_ratio is 0 and illegal data is encountered
+                        // during stream loading, the entire load process is terminated early.
+                        // However, the http connection may still be sending data to stream_load_pipe
+                        // and waiting for it to be consumed.
+                        // Therefore, we need to actively cancel to end the pipe.
+                        ctx->body_sink->cancel(status->to_string());
                     }
 
+                    if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+                        if (ctx->body_sink->cancelled() || !status->ok()) {
+                            ctx->status = *status;
+                            this->rollback_txn(ctx.get());
+                        } else {
+                            this->commit_txn(ctx.get());
+                        }
+                    }
+                });
+    } else {
+        st = _exec_env->fragment_mgr()->exec_plan_fragment(
+                ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) {
+                    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+                    ctx->commit_infos = std::move(state->tablet_commit_infos());
                     if (status->ok()) {
-                        DorisMetrics::instance()->stream_receive_bytes_total->increment(
-                                ctx->receive_bytes);
-                        DorisMetrics::instance()->stream_load_rows_total->increment(
-                                ctx->number_loaded_rows);
+                        ctx->number_total_rows = state->num_rows_load_total();
+                        ctx->number_loaded_rows = state->num_rows_load_success();
+                        ctx->number_filtered_rows = state->num_rows_load_filtered();
+                        ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+                        int64_t num_selected_rows =
+                                ctx->number_total_rows - ctx->number_unselected_rows;
+                        if (num_selected_rows > 0 &&
+                            (double)ctx->number_filtered_rows / num_selected_rows >
+                                    ctx->max_filter_ratio) {
+                            // NOTE: Do not modify the error message here, for historical reasons,
+                            // some users may rely on this error message.
+                            *status = Status::InternalError("too many filtered rows");
+                        }
+                        if (ctx->number_filtered_rows > 0 &&
+                            !state->get_error_log_file_path().empty()) {
+                            ctx->error_url =
+                                    to_load_error_http_path(state->get_error_log_file_path());
+                        }
+
+                        if (status->ok()) {
+                            DorisMetrics::instance()->stream_receive_bytes_total->increment(
+                                    ctx->receive_bytes);
+                            DorisMetrics::instance()->stream_load_rows_total->increment(
+                                    ctx->number_loaded_rows);
+                        }
+                    } else {
+                        LOG(WARNING)
+                                << "fragment execute failed"
+                                << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
+                                << ", err_msg=" << status->to_string() << ", " << ctx->brief();
+                        // cancel body_sink, make sender known it
+                        if (ctx->body_sink != nullptr) {
+                            ctx->body_sink->cancel(status->to_string());
+                        }
+
+                        switch (ctx->load_src_type) {
+                        // reset the stream load ctx's kafka commit offset
+                        case TLoadSourceType::KAFKA:
+                            ctx->kafka_info->reset_offset();
+                            break;
+                        default:
+                            break;
+                        }
                     }
-                } else {
-                    LOG(WARNING) << "fragment execute failed"
-                                 << ", query_id="
-                                 << UniqueId(ctx->put_result.params.params.query_id)
-                                 << ", err_msg=" << status->to_string() << ", " << ctx->brief();
-                    // cancel body_sink, make sender known it
-                    if (ctx->body_sink != nullptr) {
+                    ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
+                    ctx->promise.set_value(*status);
+
+                    if (!status->ok() && ctx->body_sink != nullptr) {
+                        // In some cases, the load execution is exited early.
+                        // For example, when max_filter_ratio is 0 and illegal data is encountered
+                        // during stream loading, the entire load process is terminated early.
+                        // However, the http connection may still be sending data to stream_load_pipe
+                        // and waiting for it to be consumed.
+                        // Therefore, we need to actively cancel to end the pipe.
                         ctx->body_sink->cancel(status->to_string());
                     }
 
-                    switch (ctx->load_src_type) {
-                    // reset the stream load ctx's kafka commit offset
-                    case TLoadSourceType::KAFKA:
-                        ctx->kafka_info->reset_offset();
-                        break;
-                    default:
-                        break;
-                    }
-                }
-                ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
-                ctx->promise.set_value(*status);
-
-                if (!status->ok() && ctx->body_sink != nullptr) {
-                    // In some cases, the load execution is exited early.
-                    // For example, when max_filter_ratio is 0 and illegal data is encountered
-                    // during stream loading, the entire load process is terminated early.
-                    // However, the http connection may still be sending data to stream_load_pipe
-                    // and waiting for it to be consumed.
-                    // Therefore, we need to actively cancel to end the pipe.
-                    ctx->body_sink->cancel(status->to_string());
-                }
-
-                if (ctx->need_commit_self && ctx->body_sink != nullptr) {
-                    if (ctx->body_sink->cancelled() || !status->ok()) {
-                        ctx->status = *status;
-                        this->rollback_txn(ctx.get());
-                    } else {
-                        this->commit_txn(ctx.get());
+                    if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+                        if (ctx->body_sink->cancelled() || !status->ok()) {
+                            ctx->status = *status;
+                            this->rollback_txn(ctx.get());
+                        } else {
+                            this->commit_txn(ctx.get());
+                        }
                     }
-                }
-            });
+                });
+    }
     if (!st.ok()) {
         // no need to check unref's return value
         return st;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index fa4be7855a..f11d2ad373 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -20,12 +20,14 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TKafkaLoadInfo;
 import org.apache.doris.thrift.TLoadSourceType;
+import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TUniqueId;
@@ -87,7 +89,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties());
         tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo);
         tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
-        tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+        if (Config.enable_pipeline_load) {
+            tRoutineLoadTask.setPipelineParams(rePlanForPipeline(routineLoadJob));
+        } else {
+            tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+        }
         tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
         tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
         tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes());
@@ -120,6 +126,15 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return tExecPlanFragmentParams;
     }
 
+    private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) throws UserException {
+        TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
+        // plan for each task, in case table has change(rollup or schema change)
+        TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
+        TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
+        tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+        return tExecPlanFragmentParams;
+    }
+
     // implement method for compatibility
     public String getHeaderType() {
         return "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5d4d2ecf8c..56186cbda2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.TransactionException;
@@ -829,6 +830,26 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         }
     }
 
+    public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) throws UserException {
+        Preconditions.checkNotNull(planner);
+        Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+        Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
+        table.readLock();
+        try {
+            TPipelineFragmentParams planParams = planner.planForPipeline(loadId);
+            // add table indexes to transaction state
+            TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
+            if (txnState == null) {
+                throw new MetaNotFoundException("txn does not exist: " + txnId);
+            }
+            txnState.addTableIndexes(planner.getDestTable());
+
+            return planParams;
+        } finally {
+            table.readUnlock();
+        }
+    }
+
     // if task not exists, before aborted will reset the txn attachment to null, task will not be updated
     // if task pass the checker, task will be updated by attachment
     // *** Please do not call before individually. It must be combined use with after ***
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index e5789e784f..ce65cf3564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -54,6 +54,8 @@ import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineInstanceParams;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
 import org.apache.doris.thrift.TQueryGlobals;
 import org.apache.doris.thrift.TQueryOptions;
@@ -309,6 +311,206 @@ public class StreamLoadPlanner {
         return params;
     }
 
+    public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException {
+        if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
+                && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
+            throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables.");
+        }
+        if (taskInfo.getMergeType() != LoadTask.MergeType.APPEND
+                && !destTable.hasDeleteSign()) {
+            throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete.");
+        }
+
+        if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) {
+            throw new UserException("Table " + destTable.getName()
+                    + " has sequence column, need to specify the sequence column");
+        }
+        if (!destTable.hasSequenceCol() && taskInfo.hasSequenceCol()) {
+            throw new UserException("There is no sequence column in the table " + destTable.getName());
+        }
+        resetAnalyzer();
+        // construct tuple descriptor, used for dataSink
+        tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
+        TupleDescriptor scanTupleDesc = tupleDesc;
+        // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info.
+        // construct tuple descriptor, used for scanNode
+        scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+        boolean negative = taskInfo.getNegative();
+        // get partial update related info
+        boolean isPartialUpdate = taskInfo.isPartialUpdate();
+        if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
+            throw new UserException("Only unique key merge on write support partial update");
+        }
+        HashSet<String> partialUpdateInputColumns = new HashSet<>();
+        if (isPartialUpdate) {
+            for (Column col : destTable.getFullSchema()) {
+                boolean existInExpr = false;
+                for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
+                    if (importColumnDesc.getColumnName() != null
+                            && importColumnDesc.getColumnName().equals(col.getName())) {
+                        if (!col.isVisible()) {
+                            throw new UserException("Partial update should not include invisible column: "
+                                    + col.getName());
+                        }
+                        partialUpdateInputColumns.add(col.getName());
+                        existInExpr = true;
+                        break;
+                    }
+                }
+                if (col.isKey() && !existInExpr) {
+                    throw new UserException("Partial update should include all key columns, missing: " + col.getName());
+                }
+            }
+        }
+        // here we should be full schema to fill the descriptor table
+        for (Column col : destTable.getFullSchema()) {
+            if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
+                continue;
+            }
+            SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setColumn(col);
+            slotDesc.setIsNullable(col.isAllowNull());
+            SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
+            scanSlotDesc.setIsMaterialized(true);
+            scanSlotDesc.setColumn(col);
+            scanSlotDesc.setIsNullable(col.isAllowNull());
+            for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
+                try {
+                    if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
+                            && importColumnDesc.getColumnName().equals(col.getName())) {
+                        scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+                        break;
+                    }
+                } catch (Exception e) {
+                    // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now.
+                    // We just skip this case here.
+                }
+            }
+            if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) {
+                throw new DdlException("Column is not SUM AggregateType. column:" + col.getName());
+            }
+        }
+
+        // Plan scan tuple of dynamic table
+        if (destTable.isDynamicSchema()) {
+            descTable.addReferencedTable(destTable);
+            scanTupleDesc.setTable(destTable);
+            // add a implict container column "DORIS_DYNAMIC_COL" for dynamic columns
+            SlotDescriptor slotDesc = descTable.addSlotDescriptor(scanTupleDesc);
+            Column col = new Column(Column.DYNAMIC_COLUMN_NAME, Type.VARIANT, false, null, false, "",
+                    "stream load auto dynamic column");
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setColumn(col);
+            // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
+            slotDesc.setNullIndicatorBit(-1);
+            slotDesc.setNullIndicatorByte(0);
+            slotDesc.setIsNullable(false);
+            LOG.debug("plan tupleDesc {}", scanTupleDesc.toString());
+        }
+
+        // create scan node
+        FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc);
+        // 1. create file group
+        DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo);
+        dataDescription.analyzeWithoutCheckPriv(db.getFullName());
+        BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
+        fileGroup.parse(db, dataDescription);
+        // 2. create dummy file status
+        TBrokerFileStatus fileStatus = new TBrokerFileStatus();
+        if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
+            fileStatus.setPath(taskInfo.getPath());
+            fileStatus.setIsDir(false);
+            fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream.
+        } else {
+            fileStatus.setPath("");
+            fileStatus.setIsDir(false);
+            fileStatus.setSize(-1); // must set to -1, means stream.
+        }
+        // The load id will pass to csv reader to find the stream load context from new load stream manager
+        fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
+                fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(),
+                taskInfo.isPartialUpdate());
+        scanNode = fileScanNode;
+
+        scanNode.init(analyzer);
+        scanNode.finalize(analyzer);
+        descTable.computeStatAndMemLayout();
+
+        int timeout = taskInfo.getTimeout();
+        if (taskInfo instanceof RoutineLoadJob) {
+            // For routine load, make the timeout fo plan fragment larger than MaxIntervalS config.
+            // So that the execution won't be killed before consuming finished.
+            timeout *= 2;
+        }
+
+        // create dest sink
+        List<Long> partitionIds = getAllPartitionIds();
+        OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
+                Config.enable_single_replica_load);
+        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
+                taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet());
+        olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
+        olapTableSink.complete();
+
+        // for stream load, we only need one fragment, ScanNode -> DataSink.
+        // OlapTableSink can dispatch data to corresponding node.
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.UNPARTITIONED);
+        fragment.setSink(olapTableSink);
+
+        fragment.finalize(null);
+
+        TPipelineFragmentParams pipParams = new TPipelineFragmentParams();
+        pipParams.setProtocolVersion(PaloInternalServiceVersion.V1);
+        pipParams.setFragment(fragment.toThrift());
+
+        pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
+        pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+        pipParams.setQueryId(loadId);
+        pipParams.per_exch_num_senders = Maps.newHashMap();
+        pipParams.destinations = Lists.newArrayList();
+        pipParams.setNumSenders(1);
+
+        TPipelineInstanceParams localParams = new TPipelineInstanceParams();
+        localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 1));
+
+        Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap();
+        List<TScanRangeParams> scanRangeParams = Lists.newArrayList();
+        for (TScanRangeLocations locations : scanNode.getScanRangeLocations(0)) {
+            scanRangeParams.add(new TScanRangeParams(locations.getScanRange()));
+        }
+        // For stream load, only one sender
+        localParams.setSenderId(0);
+        perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams);
+        localParams.setPerNodeScanRanges(perNodeScanRange);
+        pipParams.setLocalParams(Lists.newArrayList());
+        pipParams.getLocalParams().add(localParams);
+        TQueryOptions queryOptions = new TQueryOptions();
+        queryOptions.setQueryType(TQueryType.LOAD);
+        queryOptions.setQueryTimeout(timeout);
+        queryOptions.setExecutionTimeout(timeout);
+        queryOptions.setMemLimit(taskInfo.getMemLimit());
+        // for stream load, we use exec_mem_limit to limit the memory usage of load channel.
+        queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
+        //load
+        queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
+        queryOptions.setBeExecVersion(Config.be_exec_version);
+        queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+
+        pipParams.setQueryOptions(queryOptions);
+        TQueryGlobals queryGlobals = new TQueryGlobals();
+        queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
+        queryGlobals.setTimestampMs(System.currentTimeMillis());
+        queryGlobals.setTimeZone(taskInfo.getTimezone());
+        queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
+        queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
+
+        pipParams.setQueryGlobals(queryGlobals);
+
+        // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params);
+        return pipParams;
+    }
+
     // get all specified partition ids.
     // if no partition specified, return null
     private List<Long> getAllPartitionIds() throws DdlException, AnalysisException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 6f156030e7..77d4c52ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.proto.InternalService;
@@ -33,6 +34,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineFragmentParamsList;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -66,44 +69,87 @@ public class InsertStreamTxnExecutor {
         StreamLoadPlanner planner = new StreamLoadPlanner(
                 txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
         // Will using load id as query id in fragment
-        TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
-        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
-        List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
-        if (beIds.isEmpty()) {
-            throw new UserException("No available backend to match the policy: " + policy);
-        }
+        if (Config.enable_pipeline_load) {
+            TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId());
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+            List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (beIds.isEmpty()) {
+                throw new UserException("No available backend to match the policy: " + policy);
+            }
 
-        tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
-        for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
-            for (TScanRangeParams scanRangeParams : entry.getValue()) {
-                scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
-                        TFileFormatType.FORMAT_PROTO);
-                scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
-                        TFileCompressType.PLAIN);
+            tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+            for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0)
+                    .per_node_scan_ranges.entrySet()) {
+                for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                    scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                            TFileFormatType.FORMAT_PROTO);
+                    scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                            TFileCompressType.PLAIN);
+                }
+            }
+            txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id);
+            this.loadId = request.getLoadId();
+            this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                    .setHi(loadId.getHi())
+                    .setLo(loadId.getLo()).build());
+
+            Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+            txnConf.setUserIp(backend.getHost());
+            txnEntry.setBackend(backend);
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+            try {
+                TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
+                paramsList.addToParamsList(tRequest);
+                Future<InternalService.PExecPlanFragmentResult> future =
+                        BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
+                InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+                TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+                }
+            } catch (RpcException e) {
+                throw new TException(e);
+            }
+        } else {
+            TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+            List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (beIds.isEmpty()) {
+                throw new UserException("No available backend to match the policy: " + policy);
             }
-        }
-        txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
-        this.loadId = request.getLoadId();
-        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
-                .setHi(loadId.getHi())
-                .setLo(loadId.getLo()).build());
 
-        Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
-        txnConf.setUserIp(backend.getHost());
-        txnEntry.setBackend(backend);
-        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
-        try {
-            TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
-            paramsList.addToParamsList(tRequest);
-            Future<InternalService.PExecPlanFragmentResult> future =
-                    BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
-            InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
-            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+            tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+            for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
+                for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                    scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                            TFileFormatType.FORMAT_PROTO);
+                    scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                            TFileCompressType.PLAIN);
+                }
+            }
+            txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+            this.loadId = request.getLoadId();
+            this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                    .setHi(loadId.getHi())
+                    .setLo(loadId.getLo()).build());
+
+            Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+            txnConf.setUserIp(backend.getHost());
+            txnEntry.setBackend(backend);
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+            try {
+                TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
+                paramsList.addToParamsList(tRequest);
+                Future<InternalService.PExecPlanFragmentResult> future =
+                        BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
+                InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+                TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+                }
+            } catch (RpcException e) {
+                throw new TException(e);
             }
-        } catch (RpcException e) {
-            throw new TException(e);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f21f64b2f..b650100823 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -127,6 +127,7 @@ import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TMasterResult;
 import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
 import org.apache.doris.thrift.TPrivilegeCtrl;
 import org.apache.doris.thrift.TPrivilegeHier;
 import org.apache.doris.thrift.TPrivilegeStatus;
@@ -1568,7 +1569,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
         try {
-            result.setParams(streamLoadPutImpl(request));
+            if (Config.enable_pipeline_load) {
+                result.setPipelineParams(pipelineStreamLoadPutImpl(request));
+            } else {
+                result.setParams(streamLoadPutImpl(request));
+            }
         } catch (UserException e) {
             LOG.warn("failed to get stream load plan: {}", e.getMessage());
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
@@ -1621,6 +1626,45 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
     }
 
+    private TPipelineFragmentParams pipelineStreamLoadPutImpl(TStreamLoadPutRequest request) throws UserException {
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        Env env = Env.getCurrentEnv();
+        String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
+        Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
+        long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
+        Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
+        if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) {
+            throw new UserException(
+                    "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName());
+        }
+        try {
+            StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
+            StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, streamLoadTask);
+            TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId());
+            // add table indexes to transaction state
+            TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
+                    .getTransactionState(db.getId(), request.getTxnId());
+            if (txnState == null) {
+                throw new UserException("txn does not exist: " + request.getTxnId());
+            }
+            txnState.addTableIndexes((OlapTable) table);
+            return plan;
+        } finally {
+            table.readUnlock();
+        }
+    }
+
     @Override
     public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException {
         if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(),
diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift
index 60beb0b27e..62f7909304 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -65,6 +65,7 @@ struct TRoutineLoadTask {
     12: optional TKafkaLoadInfo kafka_load_info
     13: optional PaloInternalService.TExecPlanFragmentParams params
     14: optional PlanNodes.TFileFormatType format
+    15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
 }
 
 struct TKafkaMetaProxyRequest {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 1f64965a1c..5aecdc4ba2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -609,6 +609,7 @@ struct TStreamLoadPutResult {
     1: required Status.TStatus status
     // valid when status is OK
     2: optional PaloInternalService.TExecPlanFragmentParams params
+    3: optional PaloInternalService.TPipelineFragmentParams pipeline_params
 }
 
 struct TKafkaRLTaskProgress {
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 0c1a4feee8..734d70bb1a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -625,6 +625,7 @@ struct TPipelineFragmentParams {
   23: optional Planner.TPlanFragment fragment
   24: list<TPipelineInstanceParams> local_params
   26: optional list<TPipelineWorkloadGroup> workload_groups
+  27: optional TTxnParams txn_conf
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org