You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/06/01 03:42:50 UTC
[doris] branch master updated: [pipeline](load) support pipeline load (#20217)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4387f47fb5 [pipeline](load) support pipeline load (#20217)
4387f47fb5 is described below
commit 4387f47fb52a9e4813e0b1551bc173d9d7186ad4
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Jun 1 11:42:43 2023 +0800
[pipeline](load) support pipeline load (#20217)
---
be/src/runtime/fragment_mgr.cpp | 36 +++-
.../routine_load/routine_load_task_executor.cpp | 9 +-
.../runtime/stream_load/stream_load_executor.cpp | 201 +++++++++++++-------
.../doris/load/routineload/KafkaTaskInfo.java | 17 +-
.../doris/load/routineload/RoutineLoadJob.java | 21 +++
.../apache/doris/planner/StreamLoadPlanner.java | 202 +++++++++++++++++++++
.../apache/doris/qe/InsertStreamTxnExecutor.java | 114 ++++++++----
.../apache/doris/service/FrontendServiceImpl.java | 46 ++++-
gensrc/thrift/BackendService.thrift | 1 +
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PaloInternalService.thrift | 1 +
11 files changed, 547 insertions(+), 102 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5ac0554104..26646bbd0e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -559,6 +559,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_ctx->txn_id = params.txn_conf.txn_id;
stream_load_ctx->id = UniqueId(params.params.query_id);
stream_load_ctx->put_result.params = params;
+ stream_load_ctx->put_result.__isset.params = true;
stream_load_ctx->use_streaming = true;
stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
stream_load_ctx->load_src_type = TLoadSourceType::RAW;
@@ -586,8 +587,39 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
}
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) {
- // TODO
- return exec_plan_fragment(params, empty_function);
+ if (params.txn_conf.need_txn) {
+ std::shared_ptr<StreamLoadContext> stream_load_ctx =
+ std::make_shared<StreamLoadContext>(_exec_env);
+ stream_load_ctx->db = params.txn_conf.db;
+ stream_load_ctx->db_id = params.txn_conf.db_id;
+ stream_load_ctx->table = params.txn_conf.tbl;
+ stream_load_ctx->txn_id = params.txn_conf.txn_id;
+ stream_load_ctx->id = UniqueId(params.query_id);
+ stream_load_ctx->put_result.pipeline_params = params;
+ stream_load_ctx->use_streaming = true;
+ stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
+ stream_load_ctx->load_src_type = TLoadSourceType::RAW;
+ stream_load_ctx->label = params.import_label;
+ stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
+ stream_load_ctx->timeout_second = 3600;
+ stream_load_ctx->auth.token = params.txn_conf.token;
+ stream_load_ctx->need_commit_self = true;
+ stream_load_ctx->need_rollback = true;
+ auto pipe = std::make_shared<io::StreamLoadPipe>(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
+ -1 /* total_length */, true /* use_proto */);
+ stream_load_ctx->body_sink = pipe;
+ stream_load_ctx->pipe = pipe;
+ stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
+
+ RETURN_IF_ERROR(
+ _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
+
+ RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
+ return Status::OK();
+ } else {
+ return exec_plan_fragment(params, empty_function);
+ }
}
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 4db6d42e18..7837c240e0 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -213,8 +213,13 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
TStatus tstatus;
tstatus.status_code = TStatusCode::OK;
put_result.status = tstatus;
- put_result.params = task.params;
- put_result.__isset.params = true;
+ if (task.__isset.params) {
+ put_result.params = task.params;
+ put_result.__isset.params = true;
+ } else {
+ put_result.pipeline_params = task.pipeline_params;
+ put_result.__isset.pipeline_params = true;
+ }
ctx->put_result = put_result;
if (task.__isset.format) {
ctx->format = task.format;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index dc03195c3e..05c68941b9 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -69,77 +69,154 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
ctx->start_write_data_nanos = MonotonicNanos();
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
- auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
- ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
- ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
- ctx->commit_infos = std::move(state->tablet_commit_infos());
- if (status->ok()) {
- ctx->number_total_rows = state->num_rows_load_total();
- ctx->number_loaded_rows = state->num_rows_load_success();
- ctx->number_filtered_rows = state->num_rows_load_filtered();
- ctx->number_unselected_rows = state->num_rows_load_unselected();
-
- int64_t num_selected_rows =
- ctx->number_total_rows - ctx->number_unselected_rows;
- if (num_selected_rows > 0 &&
- (double)ctx->number_filtered_rows / num_selected_rows >
- ctx->max_filter_ratio) {
- // NOTE: Do not modify the error message here, for historical reasons,
- // some users may rely on this error message.
- *status = Status::InternalError("too many filtered rows");
+ Status st;
+ if (ctx->put_result.__isset.params) {
+ st = _exec_env->fragment_mgr()->exec_plan_fragment(
+ ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+ ctx->commit_infos = std::move(state->tablet_commit_infos());
+ if (status->ok()) {
+ ctx->number_total_rows = state->num_rows_load_total();
+ ctx->number_loaded_rows = state->num_rows_load_success();
+ ctx->number_filtered_rows = state->num_rows_load_filtered();
+ ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+ int64_t num_selected_rows =
+ ctx->number_total_rows - ctx->number_unselected_rows;
+ if (num_selected_rows > 0 &&
+ (double)ctx->number_filtered_rows / num_selected_rows >
+ ctx->max_filter_ratio) {
+ // NOTE: Do not modify the error message here, for historical reasons,
+ // some users may rely on this error message.
+ *status = Status::InternalError("too many filtered rows");
+ }
+ if (ctx->number_filtered_rows > 0 &&
+ !state->get_error_log_file_path().empty()) {
+ ctx->error_url =
+ to_load_error_http_path(state->get_error_log_file_path());
+ }
+
+ if (status->ok()) {
+ DorisMetrics::instance()->stream_receive_bytes_total->increment(
+ ctx->receive_bytes);
+ DorisMetrics::instance()->stream_load_rows_total->increment(
+ ctx->number_loaded_rows);
+ }
+ } else {
+ LOG(WARNING)
+ << "fragment execute failed"
+ << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
+ << ", err_msg=" << status->to_string() << ", " << ctx->brief();
+ // cancel body_sink, make sender known it
+ if (ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(status->to_string());
+ }
+
+ switch (ctx->load_src_type) {
+ // reset the stream load ctx's kafka commit offset
+ case TLoadSourceType::KAFKA:
+ ctx->kafka_info->reset_offset();
+ break;
+ default:
+ break;
+ }
}
- if (ctx->number_filtered_rows > 0 &&
- !state->get_error_log_file_path().empty()) {
- ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
+ ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
+ ctx->promise.set_value(*status);
+
+ if (!status->ok() && ctx->body_sink != nullptr) {
+ // In some cases, the load execution is exited early.
+ // For example, when max_filter_ratio is 0 and illegal data is encountered
+ // during stream loading, the entire load process is terminated early.
+ // However, the http connection may still be sending data to stream_load_pipe
+ // and waiting for it to be consumed.
+ // Therefore, we need to actively cancel to end the pipe.
+ ctx->body_sink->cancel(status->to_string());
}
+ if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+ if (ctx->body_sink->cancelled() || !status->ok()) {
+ ctx->status = *status;
+ this->rollback_txn(ctx.get());
+ } else {
+ this->commit_txn(ctx.get());
+ }
+ }
+ });
+ } else {
+ st = _exec_env->fragment_mgr()->exec_plan_fragment(
+ ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) {
+ ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
+ ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
- DorisMetrics::instance()->stream_receive_bytes_total->increment(
- ctx->receive_bytes);
- DorisMetrics::instance()->stream_load_rows_total->increment(
- ctx->number_loaded_rows);
+ ctx->number_total_rows = state->num_rows_load_total();
+ ctx->number_loaded_rows = state->num_rows_load_success();
+ ctx->number_filtered_rows = state->num_rows_load_filtered();
+ ctx->number_unselected_rows = state->num_rows_load_unselected();
+
+ int64_t num_selected_rows =
+ ctx->number_total_rows - ctx->number_unselected_rows;
+ if (num_selected_rows > 0 &&
+ (double)ctx->number_filtered_rows / num_selected_rows >
+ ctx->max_filter_ratio) {
+ // NOTE: Do not modify the error message here, for historical reasons,
+ // some users may rely on this error message.
+ *status = Status::InternalError("too many filtered rows");
+ }
+ if (ctx->number_filtered_rows > 0 &&
+ !state->get_error_log_file_path().empty()) {
+ ctx->error_url =
+ to_load_error_http_path(state->get_error_log_file_path());
+ }
+
+ if (status->ok()) {
+ DorisMetrics::instance()->stream_receive_bytes_total->increment(
+ ctx->receive_bytes);
+ DorisMetrics::instance()->stream_load_rows_total->increment(
+ ctx->number_loaded_rows);
+ }
+ } else {
+ LOG(WARNING)
+ << "fragment execute failed"
+ << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
+ << ", err_msg=" << status->to_string() << ", " << ctx->brief();
+ // cancel body_sink, make sender known it
+ if (ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(status->to_string());
+ }
+
+ switch (ctx->load_src_type) {
+ // reset the stream load ctx's kafka commit offset
+ case TLoadSourceType::KAFKA:
+ ctx->kafka_info->reset_offset();
+ break;
+ default:
+ break;
+ }
}
- } else {
- LOG(WARNING) << "fragment execute failed"
- << ", query_id="
- << UniqueId(ctx->put_result.params.params.query_id)
- << ", err_msg=" << status->to_string() << ", " << ctx->brief();
- // cancel body_sink, make sender known it
- if (ctx->body_sink != nullptr) {
+ ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
+ ctx->promise.set_value(*status);
+
+ if (!status->ok() && ctx->body_sink != nullptr) {
+ // In some cases, the load execution is exited early.
+ // For example, when max_filter_ratio is 0 and illegal data is encountered
+ // during stream loading, the entire load process is terminated early.
+ // However, the http connection may still be sending data to stream_load_pipe
+ // and waiting for it to be consumed.
+ // Therefore, we need to actively cancel to end the pipe.
ctx->body_sink->cancel(status->to_string());
}
- switch (ctx->load_src_type) {
- // reset the stream load ctx's kafka commit offset
- case TLoadSourceType::KAFKA:
- ctx->kafka_info->reset_offset();
- break;
- default:
- break;
- }
- }
- ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
-
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal data is encountered
- // during stream loading, the entire load process is terminated early.
- // However, the http connection may still be sending data to stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the pipe.
- ctx->body_sink->cancel(status->to_string());
- }
-
- if (ctx->need_commit_self && ctx->body_sink != nullptr) {
- if (ctx->body_sink->cancelled() || !status->ok()) {
- ctx->status = *status;
- this->rollback_txn(ctx.get());
- } else {
- this->commit_txn(ctx.get());
+ if (ctx->need_commit_self && ctx->body_sink != nullptr) {
+ if (ctx->body_sink->cancelled() || !status->ok()) {
+ ctx->status = *status;
+ this->rollback_txn(ctx.get());
+ } else {
+ this->commit_txn(ctx.get());
+ }
}
- }
- });
+ });
+ }
if (!st.ok()) {
// no need to check unref's return value
return st;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index fa4be7855a..f11d2ad373 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -20,12 +20,14 @@ package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
+import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TUniqueId;
@@ -87,7 +89,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties());
tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo);
tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
- tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+ if (Config.enable_pipeline_load) {
+ tRoutineLoadTask.setPipelineParams(rePlanForPipeline(routineLoadJob));
+ } else {
+ tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+ }
tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes());
@@ -120,6 +126,15 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return tExecPlanFragmentParams;
}
+ private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) throws UserException {
+ TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
+ // plan for each task, in case table has change(rollup or schema change)
+ TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
+ TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
+ tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+ return tExecPlanFragmentParams;
+ }
+
// implement method for compatibility
public String getHeaderType() {
return "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 5d4d2ecf8c..56186cbda2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionException;
@@ -829,6 +830,26 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
+ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) throws UserException {
+ Preconditions.checkNotNull(planner);
+ Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+ Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
+ table.readLock();
+ try {
+ TPipelineFragmentParams planParams = planner.planForPipeline(loadId);
+ // add table indexes to transaction state
+ TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
+ if (txnState == null) {
+ throw new MetaNotFoundException("txn does not exist: " + txnId);
+ }
+ txnState.addTableIndexes(planner.getDestTable());
+
+ return planParams;
+ } finally {
+ table.readUnlock();
+ }
+ }
+
// if task not exists, before aborted will reset the txn attachment to null, task will not be updated
// if task pass the checker, task will be updated by attachment
// *** Please do not call before individually. It must be combined use with after ***
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index e5789e784f..ce65cf3564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -54,6 +54,8 @@ import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
@@ -309,6 +311,206 @@ public class StreamLoadPlanner {
return params;
}
+ public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException {
+ if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
+ && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
+ throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables.");
+ }
+ if (taskInfo.getMergeType() != LoadTask.MergeType.APPEND
+ && !destTable.hasDeleteSign()) {
+ throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete.");
+ }
+
+ if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) {
+ throw new UserException("Table " + destTable.getName()
+ + " has sequence column, need to specify the sequence column");
+ }
+ if (!destTable.hasSequenceCol() && taskInfo.hasSequenceCol()) {
+ throw new UserException("There is no sequence column in the table " + destTable.getName());
+ }
+ resetAnalyzer();
+ // construct tuple descriptor, used for dataSink
+ tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
+ TupleDescriptor scanTupleDesc = tupleDesc;
+ // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info.
+ // construct tuple descriptor, used for scanNode
+ scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+ boolean negative = taskInfo.getNegative();
+ // get partial update related info
+ boolean isPartialUpdate = taskInfo.isPartialUpdate();
+ if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
+ throw new UserException("Only unique key merge on write support partial update");
+ }
+ HashSet<String> partialUpdateInputColumns = new HashSet<>();
+ if (isPartialUpdate) {
+ for (Column col : destTable.getFullSchema()) {
+ boolean existInExpr = false;
+ for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
+ if (importColumnDesc.getColumnName() != null
+ && importColumnDesc.getColumnName().equals(col.getName())) {
+ if (!col.isVisible()) {
+ throw new UserException("Partial update should not include invisible column: "
+ + col.getName());
+ }
+ partialUpdateInputColumns.add(col.getName());
+ existInExpr = true;
+ break;
+ }
+ }
+ if (col.isKey() && !existInExpr) {
+ throw new UserException("Partial update should include all key columns, missing: " + col.getName());
+ }
+ }
+ }
+ // here we should be full schema to fill the descriptor table
+ for (Column col : destTable.getFullSchema()) {
+ if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
+ continue;
+ }
+ SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
+ slotDesc.setIsMaterialized(true);
+ slotDesc.setColumn(col);
+ slotDesc.setIsNullable(col.isAllowNull());
+ SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
+ scanSlotDesc.setIsMaterialized(true);
+ scanSlotDesc.setColumn(col);
+ scanSlotDesc.setIsNullable(col.isAllowNull());
+ for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
+ try {
+ if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
+ && importColumnDesc.getColumnName().equals(col.getName())) {
+ scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+ break;
+ }
+ } catch (Exception e) {
+ // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now.
+ // We just skip this case here.
+ }
+ }
+ if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) {
+ throw new DdlException("Column is not SUM AggregateType. column:" + col.getName());
+ }
+ }
+
+ // Plan scan tuple of dynamic table
+ if (destTable.isDynamicSchema()) {
+ descTable.addReferencedTable(destTable);
+ scanTupleDesc.setTable(destTable);
+ // add a implict container column "DORIS_DYNAMIC_COL" for dynamic columns
+ SlotDescriptor slotDesc = descTable.addSlotDescriptor(scanTupleDesc);
+ Column col = new Column(Column.DYNAMIC_COLUMN_NAME, Type.VARIANT, false, null, false, "",
+ "stream load auto dynamic column");
+ slotDesc.setIsMaterialized(true);
+ slotDesc.setColumn(col);
+ // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
+ slotDesc.setNullIndicatorBit(-1);
+ slotDesc.setNullIndicatorByte(0);
+ slotDesc.setIsNullable(false);
+ LOG.debug("plan tupleDesc {}", scanTupleDesc.toString());
+ }
+
+ // create scan node
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc);
+ // 1. create file group
+ DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo);
+ dataDescription.analyzeWithoutCheckPriv(db.getFullName());
+ BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
+ fileGroup.parse(db, dataDescription);
+ // 2. create dummy file status
+ TBrokerFileStatus fileStatus = new TBrokerFileStatus();
+ if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
+ fileStatus.setPath(taskInfo.getPath());
+ fileStatus.setIsDir(false);
+ fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream.
+ } else {
+ fileStatus.setPath("");
+ fileStatus.setIsDir(false);
+ fileStatus.setSize(-1); // must set to -1, means stream.
+ }
+ // The load id will pass to csv reader to find the stream load context from new load stream manager
+ fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
+ fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(),
+ taskInfo.isPartialUpdate());
+ scanNode = fileScanNode;
+
+ scanNode.init(analyzer);
+ scanNode.finalize(analyzer);
+ descTable.computeStatAndMemLayout();
+
+ int timeout = taskInfo.getTimeout();
+ if (taskInfo instanceof RoutineLoadJob) {
+ // For routine load, make the timeout fo plan fragment larger than MaxIntervalS config.
+ // So that the execution won't be killed before consuming finished.
+ timeout *= 2;
+ }
+
+ // create dest sink
+ List<Long> partitionIds = getAllPartitionIds();
+ OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
+ Config.enable_single_replica_load);
+ olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
+ taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet());
+ olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
+ olapTableSink.complete();
+
+ // for stream load, we only need one fragment, ScanNode -> DataSink.
+ // OlapTableSink can dispatch data to corresponding node.
+ PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.UNPARTITIONED);
+ fragment.setSink(olapTableSink);
+
+ fragment.finalize(null);
+
+ TPipelineFragmentParams pipParams = new TPipelineFragmentParams();
+ pipParams.setProtocolVersion(PaloInternalServiceVersion.V1);
+ pipParams.setFragment(fragment.toThrift());
+
+ pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
+ pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+ pipParams.setQueryId(loadId);
+ pipParams.per_exch_num_senders = Maps.newHashMap();
+ pipParams.destinations = Lists.newArrayList();
+ pipParams.setNumSenders(1);
+
+ TPipelineInstanceParams localParams = new TPipelineInstanceParams();
+ localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 1));
+
+ Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap();
+ List<TScanRangeParams> scanRangeParams = Lists.newArrayList();
+ for (TScanRangeLocations locations : scanNode.getScanRangeLocations(0)) {
+ scanRangeParams.add(new TScanRangeParams(locations.getScanRange()));
+ }
+ // For stream load, only one sender
+ localParams.setSenderId(0);
+ perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams);
+ localParams.setPerNodeScanRanges(perNodeScanRange);
+ pipParams.setLocalParams(Lists.newArrayList());
+ pipParams.getLocalParams().add(localParams);
+ TQueryOptions queryOptions = new TQueryOptions();
+ queryOptions.setQueryType(TQueryType.LOAD);
+ queryOptions.setQueryTimeout(timeout);
+ queryOptions.setExecutionTimeout(timeout);
+ queryOptions.setMemLimit(taskInfo.getMemLimit());
+ // for stream load, we use exec_mem_limit to limit the memory usage of load channel.
+ queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
+ //load
+ queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
+ queryOptions.setBeExecVersion(Config.be_exec_version);
+ queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+
+ pipParams.setQueryOptions(queryOptions);
+ TQueryGlobals queryGlobals = new TQueryGlobals();
+ queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
+ queryGlobals.setTimestampMs(System.currentTimeMillis());
+ queryGlobals.setTimeZone(taskInfo.getTimezone());
+ queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
+ queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
+
+ pipParams.setQueryGlobals(queryGlobals);
+
+ // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params);
+ return pipParams;
+ }
+
// get all specified partition ids.
// if no partition specified, return null
private List<Long> getAllPartitionIds() throws DdlException, AnalysisException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 6f156030e7..77d4c52ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.proto.InternalService;
@@ -33,6 +34,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
@@ -66,44 +69,87 @@ public class InsertStreamTxnExecutor {
StreamLoadPlanner planner = new StreamLoadPlanner(
txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
// Will using load id as query id in fragment
- TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
- BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
- List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
- if (beIds.isEmpty()) {
- throw new UserException("No available backend to match the policy: " + policy);
- }
+ if (Config.enable_pipeline_load) {
+ TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId());
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+ List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (beIds.isEmpty()) {
+ throw new UserException("No available backend to match the policy: " + policy);
+ }
- tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
- for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
- for (TScanRangeParams scanRangeParams : entry.getValue()) {
- scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
- TFileFormatType.FORMAT_PROTO);
- scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
- TFileCompressType.PLAIN);
+ tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+ for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0)
+ .per_node_scan_ranges.entrySet()) {
+ for (TScanRangeParams scanRangeParams : entry.getValue()) {
+ scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+ TFileFormatType.FORMAT_PROTO);
+ scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+ TFileCompressType.PLAIN);
+ }
+ }
+ txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id);
+ this.loadId = request.getLoadId();
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
+
+ Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+ txnConf.setUserIp(backend.getHost());
+ txnEntry.setBackend(backend);
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ try {
+ TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
+ paramsList.addToParamsList(tRequest);
+ Future<InternalService.PExecPlanFragmentResult> future =
+ BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
+ InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+ }
+ } catch (RpcException e) {
+ throw new TException(e);
+ }
+ } else {
+ TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+ List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (beIds.isEmpty()) {
+ throw new UserException("No available backend to match the policy: " + policy);
}
- }
- txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
- this.loadId = request.getLoadId();
- this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
- .setHi(loadId.getHi())
- .setLo(loadId.getLo()).build());
- Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
- txnConf.setUserIp(backend.getHost());
- txnEntry.setBackend(backend);
- TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
- try {
- TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
- paramsList.addToParamsList(tRequest);
- Future<InternalService.PExecPlanFragmentResult> future =
- BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
- InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
- TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+ tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+ for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
+ for (TScanRangeParams scanRangeParams : entry.getValue()) {
+ scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+ TFileFormatType.FORMAT_PROTO);
+ scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+ TFileCompressType.PLAIN);
+ }
+ }
+ txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+ this.loadId = request.getLoadId();
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
+
+ Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+ txnConf.setUserIp(backend.getHost());
+ txnEntry.setBackend(backend);
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ try {
+ TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
+ paramsList.addToParamsList(tRequest);
+ Future<InternalService.PExecPlanFragmentResult> future =
+ BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
+ InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+ }
+ } catch (RpcException e) {
+ throw new TException(e);
}
- } catch (RpcException e) {
- throw new TException(e);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8f21f64b2f..b650100823 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -127,6 +127,7 @@ import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPrivilegeCtrl;
import org.apache.doris.thrift.TPrivilegeHier;
import org.apache.doris.thrift.TPrivilegeStatus;
@@ -1568,7 +1569,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
- result.setParams(streamLoadPutImpl(request));
+ if (Config.enable_pipeline_load) {
+ result.setPipelineParams(pipelineStreamLoadPutImpl(request));
+ } else {
+ result.setParams(streamLoadPutImpl(request));
+ }
} catch (UserException e) {
LOG.warn("failed to get stream load plan: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
@@ -1621,6 +1626,45 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
}
+ private TPipelineFragmentParams pipelineStreamLoadPutImpl(TStreamLoadPutRequest request) throws UserException {
+ String cluster = request.getCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = SystemInfoService.DEFAULT_CLUSTER;
+ }
+
+ Env env = Env.getCurrentEnv();
+ String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
+ Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+ if (db == null) {
+ String dbName = fullDbName;
+ if (Strings.isNullOrEmpty(request.getCluster())) {
+ dbName = request.getDb();
+ }
+ throw new UserException("unknown database, database=" + dbName);
+ }
+ long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
+ Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
+ if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) {
+ throw new UserException(
+ "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName());
+ }
+ try {
+ StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
+ StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, streamLoadTask);
+ TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId());
+ // add table indexes to transaction state
+ TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(db.getId(), request.getTxnId());
+ if (txnState == null) {
+ throw new UserException("txn does not exist: " + request.getTxnId());
+ }
+ txnState.addTableIndexes((OlapTable) table);
+ return plan;
+ } finally {
+ table.readUnlock();
+ }
+ }
+
@Override
public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException {
if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(),
diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift
index 60beb0b27e..62f7909304 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -65,6 +65,7 @@ struct TRoutineLoadTask {
12: optional TKafkaLoadInfo kafka_load_info
13: optional PaloInternalService.TExecPlanFragmentParams params
14: optional PlanNodes.TFileFormatType format
+ 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
}
struct TKafkaMetaProxyRequest {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 1f64965a1c..5aecdc4ba2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -609,6 +609,7 @@ struct TStreamLoadPutResult {
1: required Status.TStatus status
// valid when status is OK
2: optional PaloInternalService.TExecPlanFragmentParams params
+ 3: optional PaloInternalService.TPipelineFragmentParams pipeline_params
}
struct TKafkaRLTaskProgress {
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 0c1a4feee8..734d70bb1a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -625,6 +625,7 @@ struct TPipelineFragmentParams {
23: optional Planner.TPlanFragment fragment
24: list<TPipelineInstanceParams> local_params
26: optional list<TPipelineWorkloadGroup> workload_groups
+ 27: optional TTxnParams txn_conf
}
struct TPipelineFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org