You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/21 02:54:23 UTC
[incubator-doris] branch master updated: [Feature][Insert] Add
transaction for the operation of insert #6244 (#6245)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7592f52 [Feature][Insert] Add transaction for the operation of insert #6244 (#6245)
7592f52 is described below
commit 7592f52d2e20266e2755cee74242e90226355f57
Author: pengxiangyu <di...@163.com>
AuthorDate: Wed Jul 21 10:54:11 2021 +0800
[Feature][Insert] Add transaction for the operation of insert #6244 (#6245)
## Proposed changes
Add transaction for the operation of insert. It will cost less time than non-transaction(it will cost 1/1000 time) when you want to insert a amount of rows.
### Syntax
```
BEGIN [ WITH LABEL label];
INSERT INTO table_name ...
[COMMIT | ROLLBACK];
```
### Example
commit a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
commit;
```
rollback a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
rollback;
```
commit a transaction with label:
```
begin with label test_label;
insert into Tbl values(11, 22, 33);
commit;
```
### Description
```
begin: begin a transaction, the next insert will execute in the transaction until commit/rollback;
commit: commit the transaction, the data in the transaction will be inserted into the table;
rollback: abort the transaction, nothing will be inserted into the table;
```
### The main realization principle:
```
1. begin a transaction in the session. next sql is executed in the transaction;
2. insert sql will be parser and get the database name and table name, they will be used to select a be and create a pipe to accept data;
3. all inserted values will be sent to the be and write into the pipe;
4. a thread will get the data from the pipe, then write them to disk;
5. commit will complete this transaction and make these data visible;
6. rollback will abort this transaction
```
### Some restrictions on the use of update syntax.
1. Only ```insert``` can be called in a transaction.
2. If something error happened, ```commit``` will not succeed, it will ```rollback``` directly;
3. By default, if part of insert in the transaction is invalid, ```commit``` will only insert the other correct data into the table.
4. If you need ```commit``` return failed when any insert in the transaction is invalid, you need execute ```set enable_insert_strict = true``` before ```begin```.
---
be/src/common/utils.h | 4 +
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/broker_scanner.cpp | 93 ++--
be/src/exec/broker_scanner.h | 2 +
.../plain_binary_line_reader.cpp} | 52 +--
.../plain_binary_line_reader.h} | 26 +-
be/src/runtime/fragment_mgr.cpp | 72 ++-
be/src/runtime/fragment_mgr.h | 5 +
be/src/runtime/stream_load/stream_load_context.h | 2 +
.../runtime/stream_load/stream_load_executor.cpp | 27 +-
be/src/runtime/stream_load/stream_load_executor.h | 4 +
be/src/runtime/stream_load/stream_load_pipe.h | 36 +-
be/src/service/internal_service.cpp | 62 +++
be/src/service/internal_service.h | 12 +
.../sql-statements/Data Manipulation/BEGIN.md | 92 ++++
.../Data Manipulation/SHOW TRANSACTION.md | 10 +-
.../sql-statements/Data Manipulation/BEGIN.md | 93 ++++
.../Data Manipulation/SHOW TRANSACTION.md | 9 +-
fe/fe-core/src/main/cup/sql_parser.cup | 35 +-
.../main/java/org/apache/doris/analysis/Expr.java | 3 +
.../java/org/apache/doris/analysis/InsertStmt.java | 18 +-
.../java/org/apache/doris/analysis/IntLiteral.java | 62 +--
.../apache/doris/analysis/ShowTransactionStmt.java | 24 +-
.../doris/analysis/TransactionBeginStmt.java | 45 ++
.../doris/analysis/TransactionCommitStmt.java | 32 +-
.../doris/analysis/TransactionRollbackStmt.java | 32 +-
.../org/apache/doris/analysis/TransactionStmt.java | 40 +-
.../main/java/org/apache/doris/common/Config.java | 7 +
.../java/org/apache/doris/planner/Planner.java | 2 +-
.../java/org/apache/doris/qe/ConnectContext.java | 48 ++
.../java/org/apache/doris/qe/ConnectScheduler.java | 1 +
.../apache/doris/qe/InsertStreamTxnExecutor.java | 188 ++++++++
.../org/apache/doris/qe/MasterTxnExecutor.java | 141 ++++++
.../java/org/apache/doris/qe/ShowExecutor.java | 9 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 498 ++++++++++++++++-----
.../org/apache/doris/rpc/BackendServiceClient.java | 12 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 43 ++
.../apache/doris/service/FrontendServiceImpl.java | 76 +++-
.../doris/transaction/DatabaseTransactionMgr.java | 14 +
.../doris/transaction/GlobalTransactionMgr.java | 53 +++
.../apache/doris/transaction/TransactionEntry.java | 119 +++++
.../apache/doris/transaction/TransactionState.java | 13 +-
.../java/org/apache/doris/qe/StmtExecutorTest.java | 3 +-
gensrc/proto/internal_service.proto | 36 ++
gensrc/thrift/FrontendService.thrift | 21 +
gensrc/thrift/PaloInternalService.thrift | 13 +
gensrc/thrift/PlanNodes.thrift | 1 +
47 files changed, 1826 insertions(+), 365 deletions(-)
diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index 4798483..50c6344 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -28,6 +28,7 @@ struct AuthInfo {
std::string user_ip;
// -1 as unset
int64_t auth_code = -1;
+ std::string auth_code_uuid = "";
};
template <class T>
@@ -39,6 +40,9 @@ void set_request_auth(T* req, const AuthInfo& auth) {
// so they have to be set.
req->user = "";
req->passwd = "";
+ } else if (auth.auth_code_uuid != "") {
+ req->__isset.auth_code_uuid = true;
+ req->auth_code_uuid = auth.auth_code_uuid;
} else {
req->user = auth.user;
req->passwd = auth.passwd;
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 27ef5ed..9e826cc 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -54,6 +54,7 @@ set(EXEC_FILES
olap_common.cpp
tablet_info.cpp
tablet_sink.cpp
+ plain_binary_line_reader.cpp
plain_text_line_reader.cpp
csv_scan_node.cpp
csv_scanner.cpp
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index b275647..058b3e8 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -25,6 +25,7 @@
#include "exec/decompressor.h"
#include "exec/exec_node.h"
#include "exec/local_file_reader.h"
+#include "exec/plain_binary_line_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
@@ -222,6 +223,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
switch (type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_JSON:
+ case TFileFormatType::FORMAT_PROTO:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileFormatType::FORMAT_CSV_GZ:
@@ -279,6 +281,7 @@ Status BrokerScanner::open_line_reader() {
// _decompressor may be NULL if this is not a compressed file
RETURN_IF_ERROR(create_decompressor(range.format_type));
+ _file_format_type = range.format_type;
// open line reader
switch (range.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
@@ -290,6 +293,9 @@ Status BrokerScanner::open_line_reader() {
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
size, _line_delimiter, _line_delimiter_length);
break;
+ case TFileFormatType::FORMAT_PROTO:
+ _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader);
+ break;
default: {
std::stringstream ss;
ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
@@ -326,40 +332,53 @@ void BrokerScanner::close() {
void BrokerScanner::split_line(const Slice& line) {
_split_values.clear();
- const char* value = line.data;
- size_t start = 0; // point to the start pos of next col value.
- size_t curpos = 0; // point to the start pos of separator matching sequence.
- size_t p1 = 0; // point to the current pos of separator matching sequence.
-
- // Separator: AAAA
- //
- // curpos
- // ▼
- // AAAA
- // 1000AAAA2000AAAA
- // ▲ ▲
- // Start │
- // p1
-
- while (curpos < line.size) {
- if (*(value + curpos + p1) != _value_separator[p1]) {
- // Not match, move forward:
- curpos += (p1 == 0 ? 1 : p1);
- p1 = 0;
- } else {
- p1++;
- if (p1 == _value_separator_length) {
- // Match a separator
- _split_values.emplace_back(value + start, curpos - start);
- start = curpos + _value_separator_length;
- curpos = start;
+ if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+ PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
+ PDataRow* row = *ptr;
+ for (const PDataColumn& col : (row)->col()) {
+ int len = col.value().size();
+ uint8_t* buf = new uint8_t[len];
+ memcpy(buf, col.value().c_str(), len);
+ _split_values.emplace_back(buf, len);
+ }
+ delete row;
+ delete ptr;
+ } else {
+ const char *value = line.data;
+ size_t start = 0; // point to the start pos of next col value.
+ size_t curpos = 0; // point to the start pos of separator matching sequence.
+ size_t p1 = 0; // point to the current pos of separator matching sequence.
+
+ // Separator: AAAA
+ //
+ // curpos
+ // ▼
+ // AAAA
+ // 1000AAAA2000AAAA
+ // ▲ ▲
+ // Start │
+ // p1
+
+ while (curpos < line.size) {
+ if (*(value + curpos + p1) != _value_separator[p1]) {
+ // Not match, move forward:
+ curpos += (p1 == 0 ? 1 : p1);
p1 = 0;
+ } else {
+ p1++;
+ if (p1 == _value_separator_length) {
+ // Match a separator
+ _split_values.emplace_back(value + start, curpos - start);
+ start = curpos + _value_separator_length;
+ curpos = start;
+ p1 = 0;
+ }
}
}
- }
- CHECK(curpos == line.size) << curpos << " vs " << line.size;
- _split_values.emplace_back(value + start, curpos - start);
+ CHECK(curpos == line.size) << curpos << " vs " << line.size;
+ _split_values.emplace_back(value + start, curpos - start);
+ }
}
void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
@@ -454,7 +473,7 @@ bool BrokerScanner::convert_one_row(const Slice& line, Tuple* tuple, MemPool* tu
// Convert one row to this tuple
bool BrokerScanner::line_to_src_tuple(const Slice& line) {
- if (!validate_utf8(line.data, line.size)) {
+ if (_file_format_type != TFileFormatType::FORMAT_PROTO && !validate_utf8(line.data, line.size)) {
std::stringstream error_msg;
error_msg << "data is not encoded by UTF-8";
_state->append_error_msg_to_file("Unable to display", error_msg.str());
@@ -474,7 +493,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
- _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+ if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+ _state->append_error_msg_to_file("", error_msg.str());
+ } else {
+ _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+ }
_counter->num_rows_filtered++;
return false;
} else if (_split_values.size() + columns_from_path.size() > _src_slot_descs.size()) {
@@ -484,7 +507,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
- _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+ if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+ _state->append_error_msg_to_file("", error_msg.str());
+ } else {
+ _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+ }
_counter->num_rows_filtered++;
return false;
}
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 4578a77..1da3e11 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -27,6 +27,7 @@
#include "exec/base_scanner.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
+#include "gen_cpp/internal_service.pb.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
@@ -99,6 +100,7 @@ private:
std::string _value_separator;
std::string _line_delimiter;
+ TFileFormatType::type _file_format_type;
int _value_separator_length;
int _line_delimiter_length;
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/exec/plain_binary_line_reader.cpp
similarity index 55%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to be/src/exec/plain_binary_line_reader.cpp
index 127e710..e680e15 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -15,34 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "exec/plain_binary_line_reader.h"
-namespace doris {
-
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
- StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
- Status begin_txn(StreamLoadContext* ctx);
+#include "common/status.h"
+#include "exec/file_reader.h"
- Status commit_txn(StreamLoadContext* ctx);
-
- void rollback_txn(StreamLoadContext* ctx);
-
- Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
- // collect the load statistics from context and set them to stat
- // return true if stat is set, otherwise, return false
- bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
+namespace doris {
-private:
- ExecEnv* _exec_env;
-};
+PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader)
+ : _file_reader(file_reader) {
+}
+
+PlainBinaryLineReader::~PlainBinaryLineReader() {
+ close();
+}
+
+void PlainBinaryLineReader::close() {
+}
+
+Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
+ std::unique_ptr<uint8_t[]> file_buf;
+ int64_t read_size = 0;
+ RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
+ *ptr = file_buf.release();
+ *size = read_size;
+ if (read_size == 0) {
+ *eof = true;
+ }
+ return Status::OK();
+}
} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/exec/plain_binary_line_reader.h
similarity index 58%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to be/src/exec/plain_binary_line_reader.h
index 127e710..9e1143b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/exec/plain_binary_line_reader.h
@@ -17,32 +17,24 @@
#pragma once
+#include "exec/line_reader.h"
+
namespace doris {
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
+class FileReader;
-class StreamLoadExecutor {
+class PlainBinaryLineReader : public LineReader {
public:
- StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
- Status begin_txn(StreamLoadContext* ctx);
+ PlainBinaryLineReader(FileReader* file_reader);
- Status commit_txn(StreamLoadContext* ctx);
+ virtual ~PlainBinaryLineReader();
- void rollback_txn(StreamLoadContext* ctx);
+ virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
- Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
- // collect the load statistics from context and set them to stat
- // return true if stat is set, otherwise, return false
- bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
+ virtual void close() override;
private:
- ExecEnv* _exec_env;
+ FileReader* _file_reader;
};
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a7832f8..415e188 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -40,6 +40,9 @@
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_filter_mgr.h"
+#include "runtime/stream_load/stream_load_pipe.h"
+#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
@@ -136,6 +139,9 @@ public:
std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }
+ void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
+ std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
+
private:
void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);
@@ -166,6 +172,8 @@ private:
std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
+ // The pipe for data transfering, such as insert.
+ std::shared_ptr<StreamLoadPipe> _pipe;
};
FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -239,6 +247,9 @@ Status FragmentExecState::cancel_before_execute() {
// set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close().
_executor.set_abort();
_executor.cancel();
+ if (_pipe != nullptr) {
+ _pipe->cancel();
+ }
return Status::OK();
}
@@ -249,6 +260,9 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
_executor.set_is_report_on_cancel(false);
}
_executor.cancel();
+ if (_pipe != nullptr) {
+ _pipe->cancel();
+ }
return Status::OK();
}
@@ -457,7 +471,63 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
}
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
- return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
+ if (params.txn_conf.need_txn) {
+ StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env);
+ stream_load_cxt->db = params.txn_conf.db;
+ stream_load_cxt->db_id = params.txn_conf.db_id;
+ stream_load_cxt->table = params.txn_conf.tbl;
+ stream_load_cxt->txn_id = params.txn_conf.txn_id;
+ stream_load_cxt->id = UniqueId(params.params.query_id);
+ stream_load_cxt->put_result.params = params;
+ stream_load_cxt->use_streaming = true;
+ stream_load_cxt->load_type = TLoadType::MANUL_LOAD;
+ stream_load_cxt->load_src_type = TLoadSourceType::RAW;
+ stream_load_cxt->label = params.import_label;
+ stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN;
+ stream_load_cxt->timeout_second = 3600;
+ stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
+ stream_load_cxt->need_commit_self = true;
+ stream_load_cxt->need_rollback = true;
+ // total_length == -1 means read one message from pipe in once time, don't care the length.
+ auto pipe = std::make_shared<StreamLoadPipe>(
+ 1024 * 1024 /* max_buffered_bytes */,
+ 64 * 1024 /* min_chunk_size */,
+ -1 /* total_length */,
+ true /* use_proto */);
+ stream_load_cxt->body_sink = pipe;
+ stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;
+
+ RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe));
+
+ RETURN_IF_ERROR(
+ _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe));
+ set_pipe(params.params.fragment_instance_id, pipe);
+ return Status::OK();
+ } else {
+ return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
+ }
+}
+
+void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe) {
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto iter = _fragment_map.find(fragment_instance_id);
+ if (iter != _fragment_map.end()) {
+ _fragment_map[fragment_instance_id]->set_pipe(pipe);
+ }
+ }
+}
+
+std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) {
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto iter = _fragment_map.find(fragment_instance_id);
+ if (iter != _fragment_map.end()) {
+ return _fragment_map[fragment_instance_id]->get_pipe();
+ } else {
+ return nullptr;
+ }
+ }
}
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index a6329cb..2ea0cba 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -48,6 +48,7 @@ class TExecPlanFragmentParams;
class TExecPlanFragmentParamsList;
class TUniqueId;
class RuntimeFilterMergeController;
+class StreamLoadPipe;
std::string to_load_error_http_path(const std::string& file_name);
@@ -88,6 +89,10 @@ public:
Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
+ void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe);
+
+ std::shared_ptr<StreamLoadPipe> get_pipe(const TUniqueId& fragment_instance_id);
+
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb);
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index aeca593..f6e7b4a 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -126,6 +126,7 @@ public:
UniqueId id;
std::string db;
+ int64_t db_id = -1;
std::string table;
std::string label;
// optional
@@ -193,6 +194,7 @@ public:
// to identified a specified data consumer.
int64_t consumer_id;
+ bool need_commit_self = false;
public:
ExecEnv* exec_env() { return _exec_env; }
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 3c9a237..781792b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -42,6 +42,10 @@ Status k_stream_load_plan_status;
#endif
Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
+ return execute_plan_fragment(ctx, nullptr);
+}
+
+Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe) {
DorisMetrics::instance()->txn_exec_plan_total->increment(1);
// submit this params
#ifndef BE_TEST
@@ -50,7 +54,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
- ctx->put_result.params, [ctx](PlanFragmentExecutor* executor) {
+ ctx->put_result.params, [ctx, pipe, this](PlanFragmentExecutor* executor) {
ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
Status status = executor->status();
if (status.ok()) {
@@ -105,6 +109,15 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(status);
+ if (ctx->need_commit_self && pipe != nullptr) {
+ if (pipe->closed() || !status.ok()) {
+ ctx->status = status;
+ this->rollback_txn(ctx);
+ } else {
+ this->commit_txn(ctx);
+ }
+ }
+
if (ctx->unref()) {
delete ctx;
}
@@ -119,7 +132,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
#endif
return Status::OK();
}
-
Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
DorisMetrics::instance()->txn_begin_request_total->increment(1);
@@ -156,6 +168,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
return status;
}
ctx->txn_id = result.txnId;
+ if (result.__isset.db_id) {
+ ctx->db_id = result.db_id;
+ }
ctx->need_rollback = true;
return Status::OK();
@@ -167,6 +182,10 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
TLoadTxnCommitRequest request;
set_request_auth(&request, ctx->auth);
request.db = ctx->db;
+ if (ctx->db_id > 0) {
+ request.db_id = ctx->db_id;
+ request.__isset.db_id = true;
+ }
request.tbl = ctx->table;
request.txnId = ctx->txn_id;
request.sync = true;
@@ -217,6 +236,10 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
request.db = ctx->db;
+ if (ctx->db_id > 0) {
+ request.db_id = ctx->db_id;
+ request.__isset.db_id = true;
+ }
request.tbl = ctx->table;
request.txnId = ctx->txn_id;
request.__set_reason(ctx->status.get_error_msg());
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h
index 127e710..4ba791c 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,12 +17,15 @@
#pragma once
+#include <memory>
+
namespace doris {
class ExecEnv;
class StreamLoadContext;
class Status;
class TTxnCommitAttachment;
+class StreamLoadPipe;
class StreamLoadExecutor {
public:
@@ -36,6 +39,7 @@ public:
Status execute_plan_fragment(StreamLoadContext* ctx);
+ Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe);
private:
// collect the load statistics from context and set them to stat
// return true if stat is set, otherwise, return false
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index 95b0adf..3912a0f 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -25,6 +25,7 @@
#include "runtime/message_body_sink.h"
#include "util/bit_util.h"
#include "util/byte_buffer.h"
+#include "gen_cpp/internal_service.pb.h"
namespace doris {
@@ -33,22 +34,24 @@ namespace doris {
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
- int64_t total_length = -1)
+ int64_t total_length = -1, bool use_proto = false)
: _buffered_bytes(0),
+ _proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
+ _use_proto(use_proto),
_finished(false),
_cancelled(false) {}
virtual ~StreamLoadPipe() {}
Status open() override { return Status::OK(); }
- Status append_and_flush(const char* data, size_t size) {
+ Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
buf->put_bytes(data, size);
buf->flip();
- return _append(buf);
+ return _append(buf, proto_byte_size);
}
Status append(const char* data, size_t size) override {
@@ -210,23 +213,38 @@ private:
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
+ if (_use_proto) {
+ PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
+ _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize());
+ }
_put_cond.notify_one();
return Status::OK();
}
- Status _append(const ByteBufferPtr& buf) {
+ Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) {
{
std::unique_lock<std::mutex> l(_lock);
// if _buf_queue is empty, we append this buf without size check
- while (!_cancelled && !_buf_queue.empty() &&
- _buffered_bytes + buf->remaining() > _max_buffered_bytes) {
- _put_cond.wait(l);
+ if (_use_proto) {
+ while (!_cancelled && !_buf_queue.empty() &&
+ (_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) {
+ _put_cond.wait(l);
+ }
+ } else {
+ while (!_cancelled && !_buf_queue.empty() &&
+ _buffered_bytes + buf->remaining() > _max_buffered_bytes) {
+ _put_cond.wait(l);
+ }
}
if (_cancelled) {
return Status::InternalError("cancelled");
}
_buf_queue.push_back(buf);
- _buffered_bytes += buf->remaining();
+ if (_use_proto) {
+ _proto_buffered_bytes += proto_byte_size;
+ } else {
+ _buffered_bytes += buf->remaining();
+ }
}
_get_cond.notify_one();
return Status::OK();
@@ -235,6 +253,7 @@ private:
// Blocking queue
std::mutex _lock;
size_t _buffered_bytes;
+ size_t _proto_buffered_bytes;
size_t _max_buffered_bytes;
size_t _min_chunk_size;
// The total amount of data expected to be read.
@@ -245,6 +264,7 @@ private:
// and the length is unknown.
// size_t is unsigned, so use int64_t
int64_t _total_length = -1;
+ bool _use_proto = false;
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index a24f356..fbe7cbb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -289,6 +289,67 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
}
template<typename T>
+void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller,
+ const PSendDataRequest* request,
+ PSendDataResult* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ for (int i = 0; i < request->data_size(); ++i) {
+ PDataRow* row = new PDataRow();
+ row->CopyFrom(request->data(i));
+ pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), sizeof(row) + row->ByteSize());
+ }
+ response->mutable_status()->set_status_code(0);
+ }
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller,
+ const PCommitRequest* request,
+ PCommitResult* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->finish();
+ response->mutable_status()->set_status_code(0);
+ }
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller,
+ const PRollbackRequest* request,
+ PRollbackResult* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard closure_guard(done);
+ TUniqueId fragment_instance_id;
+ fragment_instance_id.hi = request->fragment_instance_id().hi();
+ fragment_instance_id.lo = request->fragment_instance_id().lo();
+ auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+ if (pipe == nullptr) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs("pipe is null");
+ } else {
+ pipe->cancel();
+ response->mutable_status()->set_status_code(0);
+ }
+}
+
+template<typename T>
void PInternalServiceImpl<T>::fold_constant_expr(
google::protobuf::RpcController* cntl_base,
const PConstantExprRequest* request,
@@ -324,6 +385,7 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_reque
FoldConstantMgr mgr(_exec_env);
return mgr.fold_constant_expr(t_request, response);
}
+
template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index b51680d..9a4f240 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -93,6 +93,18 @@ public:
::doris::PPublishFilterResponse* response,
::google::protobuf::Closure* done) override;
+ void send_data(google::protobuf::RpcController* controller,
+ const PSendDataRequest* request,
+ PSendDataResult* response,
+ google::protobuf::Closure* done);
+ void commit(google::protobuf::RpcController* controller,
+ const PCommitRequest* request,
+ PCommitResult* response,
+ google::protobuf::Closure* done);
+ void rollback(google::protobuf::RpcController* controller,
+ const PRollbackRequest* request,
+ PRollbackResult* response,
+ google::protobuf::Closure* done);
void fold_constant_expr(google::protobuf::RpcController* controller,
const PConstantExprRequest* request,
PConstantExprResult* response,
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md
new file mode 100644
index 0000000..069eeeb
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md
@@ -0,0 +1,92 @@
+---
+{
+ "title": "BEGIN",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# BEGIN, COMMIT, ROLLBACK
+## Description
+### Syntax
+
+```
+BEGIN;
+INSERT INTO table_name ...
+COMMIT;
+```
+```
+BEGIN [ WITH LABEL label];
+INSERT INTO table_name ...
+ROLLBACK;
+```
+### Parameters
+
+> label: the label for this transaction, if you need to set it to a string.
+
+### Note
+
+A transaction can only be used on insert, nor update or delete. You can check the state of this transaction by `SHOW TRANSACTION WHERE LABEL = 'label'`
+
+## example
+
+1. Begin a transaction without a label, then commit it
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+All the data in the sql between `begin` and `commit` will be inserted into the table.
+
+2. Begin a transaction without a label, then abort it
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+ROLLBACK:
+```
+
+All the data in the sql between `begin` and `rollback` will be aborted, nothing will be inserted into the table.
+
+3. Begin a transaction with a label, then commit it
+
+```
+BEGIN WITH LABEL test_label1
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+All the data in the sql between `begin` and `commit` will be inserted into the table.
+The label of `test_label1` will be set to mark this transaction. You can check this transaction by `SHOW TRANSACTION WHERE LABEL = 'test_label1'`.
+
+## keyword
+BEGIN, COMMIT, ROLLBACK
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
index 0639f15..310636d 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
@@ -27,14 +27,16 @@ under the License.
# SHOW TRANSACTION
## description
-This syntax is used to view transaction details for the specified transaction id.
+This syntax is used to view transaction details for the specified transaction id or label name.
grammar:
```
SHOW TRANSACTION
[FROM db_name]
-WHERE id = transaction_id;
+WHERE
+[id = transaction_id]
+[label = label_name];
```
Example return result:
@@ -81,6 +83,10 @@ ErrorReplicasCount: 0
SHOW TRANSACTION FROM db WHERE ID = 4005;
+3. View the transaction with label `label_name`:
+
+ SHOW TRANSACTION WHERE LABEL = 'label_name';
+
## keyword
SHOW, TRANSACTION
\ No newline at end of file
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md
new file mode 100644
index 0000000..a286a3f
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md
@@ -0,0 +1,93 @@
+---
+{
+ "title": "BEGIN",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# BEGIN, COMMIT, ROLLBACK
+## Description
+### Syntax
+
+```
+BEGIN;
+INSERT INTO table_name ...
+COMMIT;
+```
+
+```
+BEGIN [ WITH LABEL label];
+INSERT INTO table_name ...
+ROLLBACK;
+```
+### Parameters
+
+> label: 用于指定当前事务的标签名。
+
+### Note
+
+事务只能对insert使用,而不能对update和delete使用,当指定标签时,可通过以下命令检查事务的运行状态: `SHOW TRANSACTION WHERE LABEL = 'label'`
+
+## example
+
+1. 开启一个事务,不指定标签,执行insert后提交。
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+所有在`begin`和`commit`之间的数据会被插入到test表中。
+
+2. 开启一个事务,不指定标签,执行insert后,回滚。
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+ROLLBACK:
+```
+
+所有在`begin`和`commit`之间的数据会取消,没有任何数据插入到test表中。
+
+3. 开启一个事务,指定标签为test_label1,执行insert后提交。
+
+```
+BEGIN WITH LABEL test_label1
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+所有在`begin`和`commit`之间的数据会被插入到test表中。
+标签`test_label1`用于标记该事务,可以通过以下命令来检查事务的状态:`SHOW TRANSACTION WHERE LABEL = 'test_label1'`。
+
+## keyword
+BEGIN, COMMIT, ROLLBACK
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
index ad9dabf..f3e83b5 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
@@ -27,14 +27,16 @@ under the License.
# SHOW TRANSACTION
## description
-该语法用于查看指定 transaction id 的事务详情。
+该语法用于查看指定 transaction id 或 label 的事务详情。
语法:
```
SHOW TRANSACTION
[FROM db_name]
-WHERE id = transaction_id;
+WHERE
+[id = transaction_id]
+[label = label_name];
```
返回结果示例:
@@ -81,6 +83,9 @@ ErrorReplicasCount: 0
SHOW TRANSACTION FROM db WHERE ID=4005;
+3. 查看 label 为 label_name的事务:
+ SHOW TRANSACTION WHERE LABEL = 'label_name';
+
## keyword
SHOW, TRANSACTION
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index caa9ade..c3c3ed1 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -292,10 +292,11 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt,
describe_stmt, alter_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
- link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
+ link_stmt, migrate_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
import_preceding_filter_stmt;
+nonterminal String transaction_label;
nonterminal ImportColumnDesc import_column_desc;
nonterminal List<ImportColumnDesc> import_column_descs;
@@ -689,6 +690,8 @@ stmt ::=
{: RESULT = stmt; :}
| restore_stmt : stmt
{: RESULT = stmt; :}
+ | transaction_stmt : stmt
+ {: RESULT = stmt; :}
| unsupported_stmt : stmt
{: RESULT = stmt; :}
| export_stmt : stmt
@@ -4810,21 +4813,39 @@ truncate_stmt ::=
:}
;
-unsupported_stmt ::=
- KW_START KW_TRANSACTION opt_with_consistent_snapshot:v
+transaction_stmt ::=
+ KW_BEGIN
{:
- RESULT = new UnsupportedStmt();
+ RESULT = new TransactionBeginStmt();
:}
- | KW_BEGIN opt_work:work
+ | KW_BEGIN KW_WITH KW_LABEL transaction_label:label
{:
- RESULT = new UnsupportedStmt();
+ RESULT = new TransactionBeginStmt(label);
:}
| KW_COMMIT opt_work opt_chain opt_release
{:
- RESULT = new UnsupportedStmt();
+ RESULT = new TransactionCommitStmt();
:}
| KW_ROLLBACK opt_work opt_chain opt_release
{:
+ RESULT = new TransactionRollbackStmt();
+ :}
+ ;
+
+transaction_label ::=
+ /* empty */
+ {:
+ RESULT = null;
+ :}
+ | ident:label
+ {:
+ RESULT = label;
+ :}
+ ;
+
+unsupported_stmt ::=
+ KW_START KW_TRANSACTION opt_with_consistent_snapshot:v
+ {:
RESULT = new UnsupportedStmt();
:}
;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 2823d98..b6b503f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -279,6 +279,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
return isAnalyzed;
}
+ public void checkValueValid() throws AnalysisException {}
+
public ExprId getId() {
return id;
}
@@ -1779,4 +1781,5 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
}
return false;
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 8c6e0d6..3b961bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -187,6 +187,10 @@ public class InsertStmt extends DdlStmt {
return tblName.getDb();
}
+ public String getTbl() {
+ return tblName.getTbl();
+ }
+
public void getTables(Analyzer analyzer, Map<Long, Table> tableMap, Set<String> parentViewNameSet) throws AnalysisException {
// get dbs of statement
queryStmt.getTables(analyzer, tableMap, parentViewNameSet);
@@ -291,6 +295,10 @@ public class InsertStmt extends DdlStmt {
analyzePlanHints(analyzer);
+ if (analyzer.getContext().isTxnModel()) {
+ return;
+ }
+
// create data sink
createDataSink();
@@ -298,10 +306,10 @@ public class InsertStmt extends DdlStmt {
// create label and begin transaction
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+ if (Strings.isNullOrEmpty(label)) {
+ label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId());
+ }
if (!isExplain() && !isTransactionBegin) {
- if (Strings.isNullOrEmpty(label)) {
- label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId());
- }
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
@@ -720,7 +728,9 @@ public class InsertStmt extends DdlStmt {
if (col.getDataType().equals(expr.getType().getPrimitiveType())) {
return expr;
}
- return expr.castTo(col.getType());
+ Expr newExpr = expr.castTo(col.getType());
+ newExpr.checkValueValid();
+ return newExpr;
}
public void prepareExpressions() throws UserException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
index 64b3973..65aee31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
@@ -66,37 +66,7 @@ public class IntLiteral extends LiteralExpr {
public IntLiteral(long longValue, Type type) throws AnalysisException {
super();
- boolean valid = true;
- switch (type.getPrimitiveType()) {
- case TINYINT:
- if (longValue < TINY_INT_MIN || longValue > TINY_INT_MAX) {
- valid = false;
- }
- break;
- case SMALLINT:
- if (longValue < SMALL_INT_MIN || longValue > SMALL_INT_MAX) {
- valid = false;
- }
- break;
- case INT:
- if (longValue < INT_MIN || longValue > INT_MAX) {
- valid = false;
- }
- break;
- case BIGINT:
- if (longValue < BIG_INT_MIN) {
- valid = false;
- }
- // no need to check upper bound
- break;
- default:
- valid = false;
- break;
- }
-
- if (!valid) {
- throw new AnalysisException("Number out of range[" + value + "]. type: " + type);
- }
+ checkValueValid(longValue, type);
this.value = longValue;
this.type = type;
@@ -107,11 +77,28 @@ public class IntLiteral extends LiteralExpr {
super();
long longValue = -1L;
try {
- longValue = Long.valueOf(value);
+ longValue = Long.parseLong(value);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid number format: " + value);
}
+ checkValueValid(longValue, type);
+
+ this.value = longValue;
+ this.type = type;
+ analysisDone();
+ }
+
+ protected IntLiteral(IntLiteral other) {
+ super(other);
+ value = other.value;
+ }
+
+ @Override
+ public void checkValueValid() throws AnalysisException {
+ checkValueValid(value, type);
+ }
+ private void checkValueValid(long longValue, Type type) throws AnalysisException {
boolean valid = true;
switch (type.getPrimitiveType()) {
case TINYINT:
@@ -141,17 +128,8 @@ public class IntLiteral extends LiteralExpr {
}
if (!valid) {
- throw new AnalysisException("Number out of range[" + value + "]. type: " + type);
+ throw new AnalysisException("Number out of range[" + longValue + "]. type: " + type);
}
-
- this.value = longValue;
- this.type = type;
- analysisDone();
- }
-
- protected IntLiteral(IntLiteral other) {
- super(other);
- value = other.value;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
index 0115247..2c42cf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
@@ -40,7 +40,8 @@ public class ShowTransactionStmt extends ShowStmt {
private String dbName;
private Expr whereClause;
- private long txnId;
+ private long txnId = -1;
+ private String label = "";
public ShowTransactionStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
@@ -55,6 +56,10 @@ public class ShowTransactionStmt extends ShowStmt {
return txnId;
}
+ public String getLabel() {
+ return label;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
@@ -99,22 +104,17 @@ public class ShowTransactionStmt extends ShowStmt {
break CHECK;
}
String leftKey = ((SlotRef) whereClause.getChild(0)).getColumnName();
- if (!leftKey.equalsIgnoreCase("id")) {
- valid = false;
- break CHECK;
- }
-
- // right child
- if (!(whereClause.getChild(1) instanceof IntLiteral)) {
+ if (leftKey.equalsIgnoreCase("id") && (whereClause.getChild(1) instanceof IntLiteral)) {
+ txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue();
+ } else if (leftKey.equalsIgnoreCase("label") && (whereClause.getChild(1) instanceof StringLiteral)) {
+ label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
+ } else {
valid = false;
- break CHECK;
}
-
- txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue();
}
if (!valid) {
- throw new AnalysisException("Where clause should looks like: id = 123");
+ throw new AnalysisException("Where clause should looks like one of them: id = 123 or label = 'label'");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java
new file mode 100644
index 0000000..7e631b5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.transaction.TransactionEntry;
+
+public class TransactionBeginStmt extends TransactionStmt {
+ private String label = null;
+ public TransactionBeginStmt() {
+ this.label = "";
+ }
+ public TransactionBeginStmt(final String label) {
+ this.label = label;
+ }
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+ if (label == null || label.isEmpty()) {
+ label = "txn_insert_" + DebugUtil.printId(analyzer.getContext().queryId());
+ }
+ if (analyzer.getContext().getTxnEntry() == null) {
+ analyzer.getContext().setTxnEntry(new TransactionEntry());
+ }
+ analyzer.getContext().getTxnEntry().setLabel(label);
+ super.analyze(analyzer);
+ }
+
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
index 127e710..08bca18 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
@@ -15,34 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+package org.apache.doris.analysis;
-namespace doris {
+public class TransactionCommitStmt extends TransactionStmt {
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
- StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
- Status begin_txn(StreamLoadContext* ctx);
-
- Status commit_txn(StreamLoadContext* ctx);
-
- void rollback_txn(StreamLoadContext* ctx);
-
- Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
- // collect the load statistics from context and set them to stat
- // return true if stat is set, otherwise, return false
- bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
- ExecEnv* _exec_env;
-};
-
-} // namespace doris
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
index 127e710..d62028e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
@@ -15,34 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+package org.apache.doris.analysis;
-namespace doris {
+public class TransactionRollbackStmt extends TransactionStmt {
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
- StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
- Status begin_txn(StreamLoadContext* ctx);
-
- Status commit_txn(StreamLoadContext* ctx);
-
- void rollback_txn(StreamLoadContext* ctx);
-
- Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
- // collect the load statistics from context and set them to stat
- // return true if stat is set, otherwise, return false
- bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
- ExecEnv* _exec_env;
-};
-
-} // namespace doris
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
index 127e710..1aeb635 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
@@ -15,34 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+package org.apache.doris.analysis;
-namespace doris {
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
+public class TransactionStmt extends StatementBase {
-class StreamLoadExecutor {
-public:
- StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.NO_FORWARD;
+ }
- Status begin_txn(StreamLoadContext* ctx);
-
- Status commit_txn(StreamLoadContext* ctx);
-
- void rollback_txn(StreamLoadContext* ctx);
-
- Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
- // collect the load statistics from context and set them to stat
- // return true if stat is set, otherwise, return false
- bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
- ExecEnv* _exec_env;
-};
-
-} // namespace doris
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+ super.analyze(analyzer);
+ }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index b77a8ce..3f0155e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -433,6 +433,13 @@ public class Config extends ConfigBase {
public static int publish_version_timeout_second = 30; // 30 seconds
/**
+ * Maximal waiting time for all data inserted before one transaction to be committed
+ * This is the timeout second for the command "commit"
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int commit_timeout_second = 30; // 30 seconds
+
+ /**
* minimal intervals between two publish version action
*/
@ConfField public static int publish_version_interval_ms = 10;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 6fd8d6e..86b2fd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -225,7 +225,7 @@ public class Planner {
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
}
- if (statement instanceof InsertStmt) {
+ if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) {
InsertStmt insertStmt = (InsertStmt) statement;
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
rootFragment.setSink(insertStmt.getDataSink());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index d762526..8f1dad8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -19,7 +19,9 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.UserException;
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
@@ -27,6 +29,7 @@ import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.TransactionEntry;
import com.google.common.collect.Lists;
@@ -64,6 +67,9 @@ public class ConnectContext {
protected volatile boolean isKilled;
// Db
protected volatile String currentDb = "";
+ protected volatile long currentDbId = -1;
+ // Transaction
+ protected volatile TransactionEntry txnEntry = null;
// cluster name
protected volatile String clusterName = "";
// username@host of current login user
@@ -154,6 +160,30 @@ public class ConnectContext {
queryDetail = null;
}
+ public boolean isTxnModel() {
+ return txnEntry != null && txnEntry.isTxnModel();
+ }
+ public boolean isTxnIniting() {
+ return txnEntry != null && txnEntry.isTxnIniting();
+ }
+ public boolean isTxnBegin() {
+ return txnEntry != null && txnEntry.isTxnBegin();
+ }
+ public void closeTxn() {
+ if (isTxnModel()) {
+ if (isTxnBegin()) {
+ try {
+ Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
+ currentDbId, txnEntry.getTxnConf().getTxnId(), "timeout");
+ } catch (UserException e) {
+ LOG.error("db: {}, txnId: {}, rollback error.", currentDb,
+ txnEntry.getTxnConf().getTxnId(), e);
+ }
+ }
+ txnEntry = null;
+ }
+ }
+
// Just for unit test
public void resetSessionVariables() {
sessionVariable = VariableMgr.newSessionVariable();
@@ -199,6 +229,18 @@ public class ConnectContext {
threadLocalInfo.set(this);
}
+ public long getCurrentDbId() {
+ return currentDbId;
+ }
+
+ public TransactionEntry getTxnEntry() {
+ return txnEntry;
+ }
+
+ public void setTxnEntry(TransactionEntry txnEntry) {
+ this.txnEntry = txnEntry;
+ }
+
public TResourceInfo toResourceCtx() {
return new TResourceInfo(qualifiedUser, sessionVariable.getResourceGroup());
}
@@ -315,6 +357,12 @@ public class ConnectContext {
public void setDatabase(String db) {
currentDb = db;
+ Database database = Catalog.getCurrentCatalog().getDb(db);
+ if (database == null) {
+ currentDbId = -1;
+ } else {
+ currentDbId = database.getId();
+ }
}
public void setExecutor(StmtExecutor executor) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index ae838f2..80f7a92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -113,6 +113,7 @@ public class ConnectScheduler {
}
public synchronized void unregisterConnection(ConnectContext ctx) {
+ ctx.closeTxn();
if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
numberConnection--;
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
new file mode 100644
index 0000000..9106006
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.StreamLoadPlanner;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.TransactionEntry;
+
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class InsertStreamTxnExecutor {
+ private long txnId;
+ private TUniqueId loadId;
+ private TransactionEntry txnEntry;
+
+ public InsertStreamTxnExecutor(TransactionEntry txnEntry) {
+ this.txnEntry = txnEntry;
+ }
+
+ public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException,
+ InterruptedException, ExecutionException {
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
+ StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
+ TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
+ List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(
+ 1, true, true, txnEntry.getDb().getClusterName());
+ if (beIds == null || beIds.isEmpty()) {
+ throw new UserException("there is no scanNode Backend.");
+ }
+
+ tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+ for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
+ for (TScanRangeParams scanRangeParams : entry.getValue()) {
+ for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) {
+ desc.setFormatType(TFileFormatType.FORMAT_PROTO);
+ }
+ }
+ }
+ txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+
+ Backend backend = Catalog.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+ txnConf.setUserIp(backend.getHost());
+ txnEntry.setBackend(backend);
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ try {
+ Future<InternalService.PExecPlanFragmentResult> future = BackendServiceProxy.getInstance().execPlanFragmentAsync(
+ address, tRequest);
+ InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+ }
+ } catch (RpcException e) {
+ throw new TException(e);
+ }
+ }
+
+ public void commitTransaction() throws TException, TimeoutException,
+ InterruptedException, ExecutionException {
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+ .setHi(txnConf.getFragmentInstanceId().getHi())
+ .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+ Backend backend = txnEntry.getBackend();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ try {
+ Future<InternalService.PCommitResult> future = BackendServiceProxy.getInstance().commit(address, fragmentInstanceId);
+ InternalService.PCommitResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to commit txn: " + result.getStatus().getErrorMsgsList());
+ }
+ } catch (RpcException e) {
+ throw new TException(e);
+ }
+ }
+
+ public void abortTransaction() throws TException, TimeoutException,
+ InterruptedException, ExecutionException {
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+ .setHi(txnConf.getFragmentInstanceId().getHi())
+ .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+ Backend be = txnEntry.getBackend();
+ TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+ try {
+ Future<InternalService.PRollbackResult> future = BackendServiceProxy.getInstance().rollback(address,
+ fragmentInstanceId);
+ InternalService.PRollbackResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to rollback txn: " + result.getStatus().getErrorMsgsList());
+ }
+ } catch (RpcException e) {
+ throw new TException(e);
+ }
+ }
+
+ public void sendData() throws TException, TimeoutException,
+ InterruptedException, ExecutionException {
+ if (txnEntry.getDataToSend() == null || txnEntry.getDataToSend().isEmpty()) {
+ return;
+ }
+
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+ .setHi(txnConf.getFragmentInstanceId().getHi())
+ .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+ Backend backend = txnEntry.getBackend();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ try {
+ Future<InternalService.PSendDataResult> future = BackendServiceProxy.getInstance().sendData(
+ address, fragmentInstanceId, txnEntry.getDataToSend());
+ InternalService.PSendDataResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList());
+ }
+ txnEntry.clearDataToSend();
+ } catch (RpcException e) {
+ throw new TException(e);
+ }
+ }
+
+ public TUniqueId getLoadId() {
+ return loadId;
+ }
+
+ public void setLoadId(TUniqueId loadId) {
+ this.loadId = loadId;
+ }
+
+ public long getTxnId() {
+ return txnId;
+ }
+
+ public void setTxnId(long txnId) {
+ this.txnId = txnId;
+ }
+
+ public TransactionEntry getTxnEntry() {
+ return txnEntry;
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java
new file mode 100644
index 0000000..917c4cc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TLoadTxnBeginRequest;
+import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public class MasterTxnExecutor {
+ private static final Logger LOG = LogManager.getLogger(MasterTxnExecutor.class);
+
+ private int waitTimeoutMs;
+ // the total time of thrift connectTime add readTime and writeTime
+ private int thriftTimeoutMs;
+ private ConnectContext ctx;
+
+ public MasterTxnExecutor(ConnectContext ctx) {
+ this.ctx = ctx;
+ this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+ this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+ }
+
+ private TNetworkAddress getMasterAddress() throws TException {
+ if (!ctx.getCatalog().isReady()) {
+ throw new TException("Node catalog is not ready, please wait for a while.");
+ }
+ String masterHost = ctx.getCatalog().getMasterIp();
+ int masterRpcPort = ctx.getCatalog().getMasterRpcPort();
+ return new TNetworkAddress(masterHost, masterRpcPort);
+ }
+
+ private FrontendService.Client getClient(TNetworkAddress thriftAddress) throws TException {
+ try {
+ return ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
+ } catch (Exception e) {
+ // may throw NullPointerException. add err msg
+ throw new TException("Failed to get master client.", e);
+ }
+ }
+ // Send request to Master
+ public TLoadTxnBeginResult beginTxn(TLoadTxnBeginRequest request) throws TException {
+ TNetworkAddress thriftAddress = getMasterAddress();
+
+ FrontendService.Client client = getClient(thriftAddress);
+
+ LOG.info("Send begin transaction {} to Master {}", ctx.getStmtId(), thriftAddress);
+
+ boolean isReturnToPool = false;
+ try {
+ TLoadTxnBeginResult result = client.loadTxnBegin(request);
+ isReturnToPool = true;
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ throw new TException("begin txn failed.");
+ }
+ return result;
+ } catch (TTransportException e) {
+ boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
+ if (!ok) {
+ throw e;
+ }
+ if (e.getType() == TTransportException.TIMED_OUT) {
+ throw e;
+ } else {
+ TLoadTxnBeginResult result = client.loadTxnBegin(request);
+ isReturnToPool = true;
+ return result;
+ }
+ } finally {
+ if (isReturnToPool) {
+ ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+ }
+ }
+ }
+
+ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws TException {
+ TNetworkAddress thriftAddress = getMasterAddress();
+
+ FrontendService.Client client = getClient(thriftAddress);
+
+ LOG.info("Send waiting transaction status {} to Master {}", ctx.getStmtId(), thriftAddress);
+
+ boolean isReturnToPool = false;
+ try {
+ TWaitingTxnStatusResult result = client.waitingTxnStatus(request);
+ isReturnToPool = true;
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ throw new TException("get txn status failed.");
+ }
+ return result;
+ } catch (TTransportException e) {
+ boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
+ if (!ok) {
+ throw e;
+ }
+ if (e.getType() == TTransportException.TIMED_OUT) {
+ throw e;
+ } else {
+ TWaitingTxnStatusResult result = client.waitingTxnStatus(request);
+ if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+ throw new TException("commit failed.");
+ }
+ isReturnToPool = true;
+ return result;
+ }
+ } finally {
+ if (isReturnToPool) {
+ ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+ }
+ }
+ }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index fecc3e1..8c54369 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1853,8 +1853,15 @@ public class ShowExecutor {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
}
- long txnId = showStmt.getTxnId();
+ Long txnId = showStmt.getTxnId();
+ String label = showStmt.getLabel();
GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr();
+ if (!label.isEmpty()) {
+ txnId = transactionMgr.getTransactionId(db.getId(), label);
+ if (txnId == null) {
+ throw new AnalysisException("transaction with label " + label + " does not exist");
+ }
+ }
resultSet = new ShowResultSet(showStmt.getMetaData(), transactionMgr.getSingleTranInfo(db.getId(), txnId));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f9b0ca9..5938712 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
+import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
@@ -38,10 +39,15 @@ import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StmtRewriter;
import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TransactionBeginStmt;
+import org.apache.doris.analysis.TransactionCommitStmt;
+import org.apache.doris.analysis.TransactionRollbackStmt;
+import org.apache.doris.analysis.TransactionStmt;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
@@ -80,13 +86,25 @@ import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TLoadTxnBeginRequest;
+import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
+import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
@@ -96,6 +114,7 @@ import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import org.glassfish.jersey.internal.guava.Sets;
import java.io.IOException;
@@ -105,6 +124,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -115,6 +136,7 @@ public class StmtExecutor implements ProfileWriter {
private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
+ private static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
private ConnectContext context;
private MysqlSerializer serializer;
@@ -264,22 +286,28 @@ public class StmtExecutor implements ProfileWriter {
context.setQueryId(queryId);
try {
+ if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt)
+ && !(parsedStmt instanceof TransactionStmt)) {
+ throw new TException("This is in a transaction, only insert, commit, rollback is acceptable.");
+ }
// support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3);
analyzeVariablesInStmt();
- // analyze this query
- SessionVariable sessionVariable = context.getSessionVariable();
- analyze(sessionVariable.toThrift());
-
- if (isForwardToMaster()) {
- forwardToMaster();
- if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
- // If the query id changed in master, we set it in context.
- // WARN: when query timeout, this code may not be reach.
- context.setQueryId(masterOpExecutor.getQueryId());
+
+ if (!context.isTxnModel()) {
+ // analyze this query
+ analyze(context.getSessionVariable().toThrift());
+ if (isForwardToMaster()) {
+ forwardToMaster();
+ if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
+ context.setQueryId(masterOpExecutor.getQueryId());
+ }
+ return;
+ } else {
+ LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
}
- return;
} else {
- LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
+ analyzer = new Analyzer(context.getCatalog(), context);
+ parsedStmt.analyze(analyzer);
}
if (parsedStmt instanceof QueryStmt) {
@@ -320,6 +348,8 @@ public class StmtExecutor implements ProfileWriter {
handleEnterStmt();
} else if (parsedStmt instanceof UseStmt) {
handleUseStmt();
+ } else if (parsedStmt instanceof TransactionStmt) {
+ handleTransactionStmt();
} else if (parsedStmt instanceof CreateTableAsSelectStmt) {
handleInsertStmt();
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
@@ -378,7 +408,7 @@ public class StmtExecutor implements ProfileWriter {
LOG.warn("failed to revert Session value.", e);
context.getState().setError(e.getMessage());
}
- if (parsedStmt instanceof InsertStmt) {
+ if (!context.isTxnModel() && parsedStmt instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) parsedStmt;
// The transaction of a insert operation begin at analyze phase.
// So we should abort the transaction at this finally block if it encounter exception.
@@ -852,6 +882,231 @@ public class StmtExecutor implements ProfileWriter {
plannerProfile.setQueryFetchResultFinishTime();
}
+
+ private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
+ TWaitingTxnStatusResult statusResult = null;
+ if (Catalog.getCurrentCatalog().isMaster()) {
+ statusResult = Catalog.getCurrentGlobalTransactionMgr()
+ .getWaitingTxnStatus(request);
+ } else {
+ MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context);
+ statusResult = masterTxnExecutor.getWaitingTxnStatus(request);
+ }
+ return statusResult;
+ }
+
+ private void handleTransactionStmt() throws Exception {
+ // Every time set no send flag and clean all data in buffer
+ context.getMysqlChannel().reset();
+ context.getState().setOk(0, 0, "");
+ // create plan
+ if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0
+ && (parsedStmt instanceof TransactionCommitStmt || parsedStmt instanceof TransactionRollbackStmt)) {
+ context.setTxnEntry(null);
+ } else if (parsedStmt instanceof TransactionBeginStmt) {
+ if (context.isTxnModel()) {
+ LOG.info("A transaction has already begin");
+ return;
+ }
+ TTxnParams txnParams = new TTxnParams();
+ txnParams.setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("");
+ if (context.getSessionVariable().getEnableInsertStrict()) {
+ txnParams.setMaxFilterRatio(0);
+ } else {
+ txnParams.setMaxFilterRatio(1.0);
+ }
+ if (context.getTxnEntry() == null) {
+ context.setTxnEntry(new TransactionEntry());
+ }
+ TransactionEntry txnEntry = context.getTxnEntry();
+ txnEntry.setTxnConf(txnParams);
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+ .append(TransactionStatus.PREPARE.name());
+ sb.append("', 'txnId':'").append("'").append("}");
+ context.getState().setOk(0, 0, sb.toString());
+ } else if (parsedStmt instanceof TransactionCommitStmt) {
+ if (!context.isTxnModel()) {
+ LOG.info("No transaction to commit");
+ return;
+ }
+
+ TTxnParams txnConf = context.getTxnEntry().getTxnConf();
+ try {
+ InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry());
+ if (context.getTxnEntry().getDataToSend().size() > 0) {
+ // send rest data
+ executor.sendData();
+ }
+ // commit txn
+ executor.commitTransaction();
+
+ // wait txn visible
+ TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest();
+ request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId());
+ request.setLabelIsSet(false);
+ request.setTxnIdIsSet(true);
+
+ TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request);
+ TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId());
+ if (txnStatus == TransactionStatus.COMMITTED) {
+ throw new AnalysisException("transaction commit successfully, BUT data will be visible later.");
+ } else if (txnStatus != TransactionStatus.VISIBLE) {
+ String errMsg = "commit failed, rollback.";
+ if (statusResult.getStatus().isSetErrorMsgs()
+ && statusResult.getStatus().getErrorMsgs().size() > 0) {
+ errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs());
+ }
+ throw new AnalysisException(errMsg);
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+ .append(txnStatus.name()).append("', 'txnId':'")
+ .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+ context.getState().setOk(0, 0, sb.toString());
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage());
+ } finally {
+ context.setTxnEntry(null);
+ }
+ } else if (parsedStmt instanceof TransactionRollbackStmt) {
+ if (!context.isTxnModel()) {
+ LOG.info("No transaction to rollback");
+ return;
+ }
+ try {
+ // abort txn
+ InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry());
+ executor.abortTransaction();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+ .append(TransactionStatus.ABORTED.name()).append("', 'txnId':'")
+ .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+ context.getState().setOk(0, 0, sb.toString());
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage());
+ } finally {
+ context.setTxnEntry(null);
+ }
+ } else {
+ throw new TException("parsedStmt type is not TransactionStmt");
+ }
+ }
+
+ public int executeForTxn(InsertStmt insertStmt)
+ throws UserException, TException, InterruptedException, ExecutionException, TimeoutException {
+ if (context.isTxnIniting()) { // first time, begin txn
+ beginTxn(insertStmt.getDb(), insertStmt.getTbl());
+ }
+ if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDb()) ||
+ !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) {
+ throw new TException("Only one table can be inserted in one transaction.");
+ }
+
+ QueryStmt queryStmt = insertStmt.getQueryStmt();
+ if (!(queryStmt instanceof SelectStmt)) {
+ throw new TException("queryStmt is not SelectStmt, insert command error");
+ }
+ TransactionEntry txnEntry = context.getTxnEntry();
+ SelectStmt selectStmt = (SelectStmt) queryStmt;
+ int effectRows = 0;
+ if (selectStmt.getValueList() != null) {
+ Table tbl = txnEntry.getTable();
+ int schemaSize = tbl.getBaseSchema(false).size();
+ for (List<Expr> row : selectStmt.getValueList().getRows()) {
+ // the value columns are columns which are visible to user, so here we use
+ // getBaseSchema(), not getFullSchema()
+ if (schemaSize != row.size()) {
+ throw new TException("Column count doesn't match value count");
+ }
+ }
+ for (List<Expr> row : selectStmt.getValueList().getRows()) {
+ ++effectRows;
+ InternalService.PDataRow data = getRowStringValue(row);
+ if (data == null) {
+ continue;
+ }
+ List<InternalService.PDataRow> dataToSend = txnEntry.getDataToSend();
+ dataToSend.add(data);
+ if (dataToSend.size() >= MAX_DATA_TO_SEND_FOR_TXN) {
+ // send data
+ InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
+ executor.sendData();
+ }
+ }
+ }
+ txnEntry.setRowsInTransaction(txnEntry.getRowsInTransaction() + effectRows);
+ return effectRows;
+ }
+
+ private void beginTxn(String dbName, String tblName) throws UserException, TException,
+ InterruptedException, ExecutionException, TimeoutException {
+ TransactionEntry txnEntry = context.getTxnEntry();
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+ TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+ Database dbObj = Catalog.getCurrentCatalog().getDb(dbName);
+ if (dbObj == null) {
+ throw new TException("database is invalid for dbName: " + dbName);
+ }
+ Table tblObj = dbObj.getTable(tblName);
+ if (tblObj == null) {
+ throw new TException("table is invalid: " + tblName);
+ }
+ txnConf.setDbId(dbObj.getId()).setTbl(tblName).setDb(dbName);
+ txnEntry.setTable(tblObj);
+ txnEntry.setDb(dbObj);
+ String label = txnEntry.getLabel();
+ if (Catalog.getCurrentCatalog().isMaster()) {
+ long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
+ txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
+ label, new TransactionState.TxnCoordinator(
+ TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
+ sourceType, timeoutSecond);
+ txnConf.setTxnId(txnId);
+ String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+ txnConf.getDbId(), txnConf.getTxnId()).getAuthCode();
+ txnConf.setAuthCodeUuid(authCodeUuid);
+ } else {
+ String authCodeUuid = UUID.randomUUID().toString();
+ MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context);
+ TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
+ request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setAuthCodeUuid(authCodeUuid)
+ .setCluster(dbObj.getClusterName()).setLabel(label).setUser("").setUserIp("").setPasswd("");
+ TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
+ txnConf.setTxnId(result.getTxnId());
+ txnConf.setAuthCodeUuid(authCodeUuid);
+ }
+
+ TStreamLoadPutRequest request = new TStreamLoadPutRequest();
+ request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb())
+ .setTbl(txnConf.getTbl())
+ .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
+ .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId());
+
+ // execute begin txn
+ InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
+ executor.beginTransaction(request);
+ }
+
+ private static final String NULL_VALUE_FOR_LOAD = "\\N";
+
+ public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
+ if (cols.size() == 0) {
+ return null;
+ }
+ InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
+ for (Expr expr : cols) {
+ if (expr instanceof NullLiteral) {
+ row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+ } else {
+ row.addColBuilder().setValue(expr.getStringValue());
+ }
+ }
+ return row.build();
+ }
+
// Process a select statement.
private void handleInsertStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
@@ -878,131 +1133,145 @@ public class StmtExecutor implements ProfileWriter {
long createTime = System.currentTimeMillis();
Throwable throwable = null;
-
- String label = insertStmt.getLabel();
- LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId()));
-
+ long txnId = -1;
+ String label = "";
long loadedRows = 0;
int filteredRows = 0;
TransactionStatus txnStatus = TransactionStatus.ABORTED;
- try {
- coord = new Coordinator(context, analyzer, planner);
- coord.setQueryType(TQueryType.LOAD);
+ String errMsg = "";
+ if (context.isTxnModel()) {
+ if (insertStmt.getQueryStmt() instanceof SelectStmt) {
+ if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
+ throw new TException("Insert into ** select is not supported in a transaction");
+ }
+ }
+ txnStatus = TransactionStatus.PREPARE;
+ loadedRows = executeForTxn(insertStmt);
+ label = context.getTxnEntry().getLabel();
+ txnId = context.getTxnEntry().getTxnConf().getTxnId();
+ } else {
+ label = insertStmt.getLabel();
+ LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId()));
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
+ try {
+ coord = new Coordinator(context, analyzer, planner);
+ coord.setQueryType(TQueryType.LOAD);
- coord.exec();
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
- coord.join(context.getSessionVariable().getQueryTimeoutS());
- if (!coord.isDone()) {
- coord.cancel();
- ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
- }
+ coord.exec();
- if (!coord.getExecStatus().ok()) {
- String errMsg = coord.getExecStatus().getErrorMsg();
- LOG.warn("insert failed: {}", errMsg);
- ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
- }
+ coord.join(context.getSessionVariable().getQueryTimeoutS());
+ if (!coord.isDone()) {
+ coord.cancel();
+ ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
+ }
- LOG.debug("delta files is {}", coord.getDeltaUrls());
+ if (!coord.getExecStatus().ok()) {
+ errMsg = coord.getExecStatus().getErrorMsg();
+ LOG.warn("insert failed: {}", errMsg);
+ ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
+ }
- if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
- loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
- }
- if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
- filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
- }
+ LOG.debug("delta files is {}", coord.getDeltaUrls());
- // if in strict mode, insert will fail if there are filtered rows
- if (context.getSessionVariable().getEnableInsertStrict()) {
- if (filteredRows > 0) {
- context.getState().setError("Insert has filtered data in strict mode, tracking_url="
- + coord.getTrackingUrl());
- return;
+ if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
+ loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
+ }
+ if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
+ filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
- }
- if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
- // no need to add load job.
- // MySQL table is already being inserted.
- context.getState().setOk(loadedRows, filteredRows, null);
- return;
- }
+ // if in strict mode, insert will fail if there are filtered rows
+ if (context.getSessionVariable().getEnableInsertStrict()) {
+ if (filteredRows > 0) {
+ context.getState().setError("Insert has filtered data in strict mode, tracking_url="
+ + coord.getTrackingUrl());
+ return;
+ }
+ }
- if (loadedRows == 0 && filteredRows == 0) {
- // if no data, just abort txn and return ok
- Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(),
- insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
- context.getState().setOk();
- return;
- }
- if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
- insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(),
- TabletCommitInfo.fromThrift(coord.getCommitInfos()),
- context.getSessionVariable().getInsertVisibleTimeoutMs())) {
- txnStatus = TransactionStatus.VISIBLE;
- MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
- } else {
- txnStatus = TransactionStatus.COMMITTED;
- }
+ if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
+ // no need to add load job.
+ // MySQL table is already being inserted.
+ context.getState().setOk(loadedRows, filteredRows, null);
+ return;
+ }
- } catch (Throwable t) {
- // if any throwable being thrown during insert operation, first we should abort this txn
- LOG.warn("handle insert stmt fail: {}", label, t);
- try {
- Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
- insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
- t.getMessage() == null ? "unknown reason" : t.getMessage());
- } catch (Exception abortTxnException) {
- // just print a log if abort txn failed. This failure do not need to pass to user.
- // user only concern abort how txn failed.
- LOG.warn("errors when abort txn", abortTxnException);
- }
+ if (loadedRows == 0 && filteredRows == 0) {
+ // if no data, just abort txn and return ok
+ Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(),
+ insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+ context.getState().setOk();
+ return;
+ }
+ if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
+ insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(),
+ TabletCommitInfo.fromThrift(coord.getCommitInfos()),
+ context.getSessionVariable().getInsertVisibleTimeoutMs())) {
+ txnStatus = TransactionStatus.VISIBLE;
+ MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+ } else {
+ txnStatus = TransactionStatus.COMMITTED;
+ }
- if (!Config.using_old_load_usage_pattern) {
- // if not using old load usage pattern, error will be returned directly to user
- StringBuilder sb = new StringBuilder(t.getMessage());
- if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
- sb.append(". url: " + coord.getTrackingUrl());
+ } catch (Throwable t) {
+ // if any throwable being thrown during insert operation, first we should abort this txn
+ LOG.warn("handle insert stmt fail: {}", label, t);
+ try {
+ Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
+ insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
+ t.getMessage() == null ? "unknown reason" : t.getMessage());
+ } catch (Exception abortTxnException) {
+ // just print a log if abort txn failed. This failure do not need to pass to user.
+ // user only concern abort how txn failed.
+ LOG.warn("errors when abort txn", abortTxnException);
}
- context.getState().setError(sb.toString());
- return;
- }
- /*
- * If config 'using_old_load_usage_pattern' is true.
- * Doris will return a label to user, and user can use this label to check load job's status,
- * which exactly like the old insert stmt usage pattern.
- */
- throwable = t;
- }
+ if (!Config.using_old_load_usage_pattern) {
+ // if not using old load usage pattern, error will be returned directly to user
+ StringBuilder sb = new StringBuilder(t.getMessage());
+ if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
+ sb.append(". url: " + coord.getTrackingUrl());
+ }
+ context.getState().setError(sb.toString());
+ return;
+ }
- // Go here, which means:
- // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
- // 2. transaction failed but Config.using_old_load_usage_pattern is true.
- // we will record the load job info for these 2 cases
+ /*
+ * If config 'using_old_load_usage_pattern' is true.
+ * Doris will return a label to user, and user can use this label to check load job's status,
+ * which exactly like the old insert stmt usage pattern.
+ */
+ throwable = t;
+ }
- String errMsg = "";
- try {
- context.getCatalog().getLoadManager().recordFinishedLoadJob(
- label,
- insertStmt.getDb(),
- insertStmt.getTargetTable().getId(),
- EtlJobType.INSERT,
- createTime,
- throwable == null ? "" : throwable.getMessage(),
- coord.getTrackingUrl());
- } catch (MetaNotFoundException e) {
- LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
- errMsg = "Record info of insert load with error " + e.getMessage();
+ // Go here, which means:
+ // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
+ // 2. transaction failed but Config.using_old_load_usage_pattern is true.
+ // we will record the load job info for these 2 cases
+
+ try {
+ context.getCatalog().getLoadManager().recordFinishedLoadJob(
+ label,
+ insertStmt.getDb(),
+ insertStmt.getTargetTable().getId(),
+ EtlJobType.INSERT,
+ createTime,
+ throwable == null ? "" : throwable.getMessage(),
+ coord.getTrackingUrl());
+ } catch (MetaNotFoundException e) {
+ LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
+ errMsg = "Record info of insert load with error " + e.getMessage();
+ }
+ txnId = insertStmt.getTransactionId();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name());
- sb.append("', 'txnId':'").append(insertStmt.getTransactionId()).append("'");
+ sb.append("', 'txnId':'").append(txnId).append("'");
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
@@ -1165,6 +1434,9 @@ public class StmtExecutor implements ProfileWriter {
if (!statisticsForAuditLog.hasScanRows()) {
statisticsForAuditLog.setScanRows(0L);
}
+ if (statisticsForAuditLog.hasReturnedRows()) {
+ statisticsForAuditLog.setReturnedRows(0L);
+ }
if (!statisticsForAuditLog.hasCpuMs()) {
statisticsForAuditLog.setCpuMs(0L);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index ffd7e25..143b066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -77,6 +77,18 @@ public class BackendServiceClient {
return stub.getInfo(request);
}
+ public Future<InternalService.PSendDataResult> sendData(InternalService.PSendDataRequest request) {
+ return stub.sendData(request);
+ }
+
+ public Future<InternalService.PRollbackResult> rollback(InternalService.PRollbackRequest request) {
+ return stub.rollback(request);
+ }
+
+ public Future<InternalService.PCommitResult> commit(InternalService.PCommitRequest request) {
+ return stub.commit(request);
+ }
+
public Future<InternalService.PConstantExprResult> foldConstantExpr(InternalService.PConstantExprRequest request) {
return stub.foldConstantExpr(request);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 40f2897..2210645 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -32,6 +32,7 @@ import org.apache.thrift.TSerializer;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -169,6 +170,48 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PSendDataResult> sendData(
+ TNetworkAddress address, Types.PUniqueId fragmentInstanceId, List<InternalService.PDataRow> data)
+ throws RpcException {
+
+ final InternalService.PSendDataRequest.Builder pRequest = InternalService.PSendDataRequest.newBuilder();
+ pRequest.setFragmentInstanceId(fragmentInstanceId);
+ pRequest.addAllData(data);
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.sendData(pRequest.build());
+ } catch (Throwable e) {
+ LOG.warn("failed to send data, address={}:{}", address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+ public Future<InternalService.PRollbackResult> rollback(TNetworkAddress address, Types.PUniqueId fragmentInstanceId)
+ throws RpcException {
+ final InternalService.PRollbackRequest pRequest = InternalService.PRollbackRequest.newBuilder()
+ .setFragmentInstanceId(fragmentInstanceId).build();
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.rollback(pRequest);
+ } catch (Throwable e) {
+ LOG.warn("failed to rollback, address={}:{}", address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+ public Future<InternalService.PCommitResult> commit(TNetworkAddress address, Types.PUniqueId fragmentInstanceId)
+ throws RpcException {
+ final InternalService.PCommitRequest pRequest = InternalService.PCommitRequest.newBuilder()
+ .setFragmentInstanceId(fragmentInstanceId).build();
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.commit(pRequest);
+ } catch (Throwable e) {
+ LOG.warn("failed to commit, address={}:{}", address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public Future<InternalService.PConstantExprResult> foldConstantExpr(
TNetworkAddress address, TFoldConstantParams tParams) throws RpcException, TException {
final InternalService.PConstantExprRequest pRequest = InternalService.PConstantExprRequest.newBuilder()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1bc16e7..a25f29f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -105,6 +105,8 @@ import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -126,6 +128,7 @@ import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.apache.doris.thrift.TStatusCode.NOT_IMPLEMENTED_ERROR;
@@ -663,6 +666,23 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
+
+ private void checkAuthCodeUuid(String dbName, long txnId, String authCodeUuid) throws AuthenticationException {
+ Database db = Catalog.getCurrentCatalog().getDb(dbName);
+ if (db == null) {
+ throw new AuthenticationException("invalid db name: " + dbName);
+ }
+ TransactionState transactionState = Catalog.getCurrentGlobalTransactionMgr().
+ getTransactionState(db.getId(), txnId);
+ if (transactionState == null) {
+ throw new AuthenticationException("invalid transactionState: " + txnId);
+ }
+ if (!authCodeUuid.equals(transactionState.getAuthCode())) {
+ throw new AuthenticationException(
+ "Access denied; you need (at least one of) the LOAD privilege(s) for this operation");
+ }
+ }
+
private void checkPasswordAndPrivs(String cluster, String user, String passwd, String db, String tbl,
String clientIp, PrivPredicate predicate) throws AuthenticationException {
@@ -718,7 +738,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
- result.setTxnId(loadTxnBeginImpl(request, clientAddr));
+ TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr);
+ result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
@@ -740,14 +761,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
- private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException {
+ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
cluster = SystemInfoService.DEFAULT_CLUSTER;
}
- checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
- request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+ if (Strings.isNullOrEmpty(request.getAuthCodeUuid())) {
+ checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+ request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+ }
// check label
if (Strings.isNullOrEmpty(request.getLabel())) {
@@ -769,10 +792,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
MetricRepo.COUNTER_LOAD_ADD.increase(1L);
- return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
+ long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
+ if (!Strings.isNullOrEmpty(request.getAuthCodeUuid())) {
+ Catalog.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId)
+ .setAuthCode(request.getAuthCodeUuid());
+ }
+ TLoadTxnBeginResult result = new TLoadTxnBeginResult();
+ result.setTxnId(txnId).setDbId(db.getId());
+ return result;
}
@Override
@@ -811,6 +841,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
+ } else if (request.isSetAuthCodeUuid()) {
+ checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid());
} else {
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
@@ -819,7 +851,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// get database
Catalog catalog = Catalog.getCurrentCatalog();
String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
- Database db = catalog.getDb(fullDbName);
+ Database db = null;
+ if (request.isSetDbId() && request.getDbId() > 0) {
+ db = catalog.getDb(request.getDbId());
+ } else {
+ db = catalog.getDb(fullDbName);
+ }
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -872,12 +909,19 @@ public class FrontendServiceImpl implements FrontendService.Iface {
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
+ } else if (request.isSetAuthCodeUuid()) {
+ checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid());
} else {
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
}
String dbName = ClusterNamespace.getFullName(cluster, request.getDb());
- Database db = Catalog.getCurrentCatalog().getDb(dbName);
+ Database db = null;
+ if (request.isSetDbId() && request.getDbId() > 0) {
+ db = Catalog.getCurrentCatalog().getDb(request.getDbId());
+ } else {
+ db = Catalog.getCurrentCatalog().getDb(dbName);
+ }
if (db == null) {
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
}
@@ -1003,6 +1047,24 @@ public class FrontendServiceImpl implements FrontendService.Iface {
TNetworkAddress addr = getClientAddr();
return addr == null ? "unknown" : addr.hostname;
}
+
+ @Override
+ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException {
+ TWaitingTxnStatusResult result = new TWaitingTxnStatusResult();
+ result.setStatus(new TStatus());
+ try {
+ result = Catalog.getCurrentGlobalTransactionMgr().getWaitingTxnStatus(request);
+ result.status.setStatusCode(TStatusCode.OK);
+ } catch (TimeoutException e) {
+ result.status.setStatusCode(TStatusCode.INCOMPLETE);
+ result.status.addToErrorMsgs(e.getMessage());
+ } catch (AnalysisException e) {
+ result.status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ result.status.addToErrorMsgs(e.getMessage());
+ }
+ return result;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 121ff60..23b42ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -654,6 +654,20 @@ public class DatabaseTransactionMgr {
}
}
+ public Long getTransactionId(String label) {
+ readLock();
+ try {
+ Set<Long> existingTxnIds = unprotectedGetTxnIdsByLabel(label);
+ if (existingTxnIds == null || existingTxnIds.isEmpty()) {
+ return null;
+ }
+ // find the latest txn (which id is largest)
+ return existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
+ } finally {
+ readUnlock();
+ }
+ }
+
public List<TransactionState> getCommittedTxnList() {
readLock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 6444b6e..83ebf07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -32,7 +32,10 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
+import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -52,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Transaction Manager
@@ -159,6 +163,16 @@ public class GlobalTransactionMgr implements Writable {
}
+ public Long getTransactionId(long dbId, String label) {
+ try {
+ DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
+ return dbTransactionMgr.getTransactionId(label);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction id by label " + label + " failed", e);
+ return null;
+ }
+ }
+
public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
@@ -495,4 +509,43 @@ public class GlobalTransactionMgr implements Writable {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
}
+
+ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws AnalysisException, TimeoutException {
+ long dbId = request.getDbId();
+ int commitTimeoutSec = Config.commit_timeout_second;
+ for (int i = 0; i < commitTimeoutSec; ++i) {
+ Database db = Catalog.getCurrentCatalog().getDb(dbId);
+ if (db == null) {
+ throw new AnalysisException("invalid db id: " + dbId);
+ }
+ TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
+ statusResult.status = new TStatus();
+ TransactionStatus txnStatus = null;
+ if (request.isSetTxnId()) {
+ long txnId = request.getTxnId();
+ TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().
+ getTransactionState(dbId, txnId);
+ if (txnState == null) {
+ throw new AnalysisException("txn does not exist: " + txnId);
+ }
+ txnStatus = txnState.getTransactionStatus();
+ if (!txnState.getReason().trim().isEmpty()) {
+ statusResult.status.setErrorMsgsIsSet(true);
+ statusResult.status.addToErrorMsgs(txnState.getReason());
+ }
+ } else {
+ txnStatus = getLabelState(dbId, request.getLabel());
+ }
+ if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) {
+ statusResult.setTxnStatusId(txnStatus.value());
+ return statusResult;
+ }
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ LOG.info("commit sleep exception.", e);
+ }
+ }
+ throw new TimeoutException("Operation is timeout");
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
new file mode 100644
index 0000000..4db596d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTxnParams;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TransactionEntry {
+
+ private String label = "";
+ private Database db;
+ private Table table;
+ private Backend backend;
+ private TTxnParams txnConf;
+ private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
+ private long rowsInTransaction = 0;
+
+ public TransactionEntry() {
+ }
+
+ public TransactionEntry(TTxnParams txnConf, Database db, Table table) {
+ this.txnConf = txnConf;
+ this.db = db;
+ this.table = table;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public Database getDb() {
+ return db;
+ }
+
+ public void setDb(Database db) {
+ this.db = db;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public void setTable(Table table) {
+ this.table = table;
+ }
+
+ public Backend getBackend() {
+ return backend;
+ }
+
+ public void setBackend(Backend backend) {
+ this.backend = backend;
+ }
+
+ public TTxnParams getTxnConf() {
+ return txnConf;
+ }
+
+ public void setTxnConf(TTxnParams txnConf) {
+ this.txnConf = txnConf;
+ }
+
+ public boolean isTxnModel() {
+ return txnConf != null && txnConf.isNeedTxn();
+ }
+
+ public boolean isTxnIniting() {
+ return isTxnModel() && txnConf.getTxnId() == -1;
+ }
+
+ public boolean isTxnBegin() {
+ return isTxnModel() && txnConf.getTxnId() != -1;
+ }
+
+ public List<InternalService.PDataRow> getDataToSend() {
+ return dataToSend;
+ }
+
+ public void setDataToSend(List<InternalService.PDataRow> dataToSend) {
+ this.dataToSend = dataToSend;
+ }
+
+ public void clearDataToSend() {
+ dataToSend.clear();
+ }
+
+ public long getRowsInTransaction() {
+ return rowsInTransaction;
+ }
+
+ public void setRowsInTransaction(long rowsInTransaction) {
+ this.rowsInTransaction = rowsInTransaction;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 23c8053..28a77e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -45,6 +45,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -205,6 +206,7 @@ public class TransactionState implements Writable {
// 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released.
private TxnStateChangeCallback callback = null;
private long timeoutMs = Config.stream_load_default_timeout_second;
+ private String authCode = "";
// is set to true, we will double the publish timeout
private boolean prolongPublishTimeout = false;
@@ -241,6 +243,7 @@ public class TransactionState implements Writable {
this.publishVersionTasks = Maps.newHashMap();
this.hasSendTask = false;
this.latch = new CountDownLatch(1);
+ this.authCode = UUID.randomUUID().toString();
}
public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requestId,
@@ -264,8 +267,16 @@ public class TransactionState implements Writable {
this.latch = new CountDownLatch(1);
this.callbackId = callbackId;
this.timeoutMs = timeoutMs;
+ this.authCode = UUID.randomUUID().toString();
}
-
+
+ public void setAuthCode(String authCode) {
+ this.authCode = authCode;
+ }
+ public String getAuthCode() {
+ return authCode;
+ }
+
public void setErrorReplicas(Set<Long> newErrorReplicas) {
this.errorReplicas = newErrorReplicas;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 5d7ddb5..506b08c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -67,6 +67,7 @@ public class StmtExecutorTest {
private ConnectContext ctx;
private QueryState state;
private ConnectScheduler scheduler;
+ private MysqlChannel channel = null;
@Mocked
SocketChannel socketChannel;
@@ -92,7 +93,7 @@ public class StmtExecutorTest {
MysqlSerializer serializer = MysqlSerializer.newInstance();
Catalog catalog = AccessTestUtil.fetchAdminCatalog();
- MysqlChannel channel = new MysqlChannel(socketChannel);
+ channel = new MysqlChannel(socketChannel);
new Expectations(channel) {
{
channel.sendOnePacket((ByteBuffer) any);
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index d900073..4ebb671 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -256,6 +256,39 @@ message PProxyResult {
optional PKafkaPartitionOffsets partition_offsets = 3;
};
+message PDataColumn {
+ optional string value = 1;
+}
+
+message PDataRow {
+ repeated PDataColumn col = 1;
+}
+
+message PSendDataRequest {
+ required PUniqueId fragment_instance_id = 1;
+ repeated PDataRow data = 2;
+}
+
+message PSendDataResult {
+ required PStatus status = 1;
+}
+
+message PCommitRequest {
+ required PUniqueId fragment_instance_id = 1;
+}
+
+message PCommitResult {
+ required PStatus status = 1;
+}
+
+message PRollbackRequest {
+ required PUniqueId fragment_instance_id = 1;
+}
+
+message PRollbackResult {
+ required PStatus status = 1;
+}
+
message PBloomFilter {
required bool always_true = 2;
required int32 filter_length = 1;
@@ -358,6 +391,9 @@ service PBackendService {
rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
+ rpc send_data(PSendDataRequest) returns (PSendDataResult);
+ rpc commit(PCommitRequest) returns (PCommitResult);
+ rpc rollback(PRollbackRequest) returns (PRollbackResult);
rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 0f8b420..78e4ea3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -503,6 +503,7 @@ struct TMiniLoadBeginRequest {
11: optional i64 auth_code
12: optional i64 create_timestamp
13: optional Types.TUniqueId request_id
+ 14: optional string auth_code_uuid
}
struct TIsMethodSupportedRequest {
@@ -533,12 +534,14 @@ struct TLoadTxnBeginRequest {
// The real value of timeout should be i32. i64 ensures the compatibility of interface.
10: optional i64 timeout
11: optional Types.TUniqueId request_id
+ 12: optional string auth_code_uuid
}
struct TLoadTxnBeginResult {
1: required Status.TStatus status
2: optional i64 txnId
3: optional string job_status // if label already used, set status of existing job
+ 4: optional i64 db_id
}
// StreamLoad request, used to load a streaming to engine
@@ -587,6 +590,7 @@ struct TStreamLoadPutRequest {
31: optional bool fuzzy_parse
32: optional string line_delimiter
33: optional bool read_json_by_line
+ 34: optional string auth_code_uuid
}
struct TStreamLoadPutResult {
@@ -638,6 +642,8 @@ struct TLoadTxnCommitRequest {
10: optional i64 auth_code
11: optional TTxnCommitAttachment txnCommitAttachment
12: optional i64 thrift_rpc_timeout_ms
+ 13: optional string auth_code_uuid
+ 14: optional i64 db_id
}
struct TLoadTxnCommitResult {
@@ -655,6 +661,8 @@ struct TLoadTxnRollbackRequest {
8: optional string reason
9: optional i64 auth_code
10: optional TTxnCommitAttachment txnCommitAttachment
+ 11: optional string auth_code_uuid
+ 12: optional i64 db_id
}
struct TLoadTxnRollbackResult {
@@ -695,6 +703,17 @@ struct TPropertyVal {
4: optional bool boolVal
}
+struct TWaitingTxnStatusRequest {
+ 1: optional i64 db_id
+ 2: optional i64 txn_id
+ 3: optional string label
+}
+
+struct TWaitingTxnStatusResult {
+ 1: optional Status.TStatus status
+ 2: optional i32 txn_status_id
+}
+
service FrontendService {
TGetDbsResult getDbNames(1:TGetDbsParams params)
TGetTablesResult getTableNames(1:TGetTablesParams params)
@@ -727,6 +746,8 @@ service FrontendService {
TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request)
TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request)
+ TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)
+
TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index ae64ef5..434c661 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -244,6 +244,18 @@ enum PaloInternalServiceVersion {
V1
}
+struct TTxnParams {
+ 1: optional bool need_txn
+ 2: optional string auth_code_uuid
+ 3: optional i64 thrift_rpc_timeout_ms
+ 4: optional string db
+ 5: optional string tbl
+ 6: optional string user_ip
+ 7: optional i64 txn_id
+ 8: optional Types.TUniqueId fragment_instance_id
+ 9: optional i64 db_id
+ 10: optional double max_filter_ratio
+}
// ExecPlanFragment
@@ -299,6 +311,7 @@ struct TExecPlanFragmentParams {
// If true, all @Common components is unset and should be got from BE's cache
// If this field is unset or it set to false, all @Common components is set.
16: optional bool is_simplified_param
+ 17: optional TTxnParams txn_conf
}
struct TExecPlanFragmentResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 8d968dc..029f5c4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -106,6 +106,7 @@ enum TFileFormatType {
FORMAT_CSV_DEFLATE,
FORMAT_ORC,
FORMAT_JSON,
+ FORMAT_PROTO,
}
struct THdfsConf {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org