You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/20 23:07:50 UTC
[doris] branch dev-1.1.1 updated: [cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 53782c616b [cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)
53782c616b is described below
commit 53782c616b0d560cc4bbc6f3c9b1294a043e6965
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Jul 21 07:07:45 2022 +0800
[cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)
---
be/src/exec/data_sink.cpp | 17 ++--
be/src/exec/odbc_connector.cpp | 112 +++++++++++++++++++++
be/src/exec/odbc_connector.h | 10 +-
be/src/runtime/odbc_table_sink.cpp | 2 +-
be/src/vec/CMakeLists.txt | 1 +
.../sink/vodbc_table_sink.cpp} | 76 +++++++-------
be/src/vec/sink/vodbc_table_sink.h | 72 +++++++++++++
7 files changed, 247 insertions(+), 43 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index a11556310a..ca6f704a38 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -34,13 +34,13 @@
#include "runtime/result_file_sink.h"
#include "runtime/result_sink.h"
#include "runtime/runtime_state.h"
-
#include "vec/sink/result_sink.h"
#include "vec/sink/vdata_stream_sender.h"
+#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vmysql_table_writer.h"
+#include "vec/sink/vodbc_table_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vtablet_sink.h"
-#include "vec/sink/vmysql_table_sink.h"
namespace doris {
@@ -81,7 +81,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
// TODO: figure out good buffer size based on size of output row
if (is_vec) {
- tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, thrift_sink.result_sink, 4096);
+ tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
+ thrift_sink.result_sink, 4096);
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
@@ -139,7 +140,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
return Status::InternalError("Missing data buffer sink.");
}
if (is_vec) {
- doris::vectorized::VMysqlTableSink* vmysql_tbl_sink = new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+ doris::vectorized::VMysqlTableSink* vmysql_tbl_sink =
+ new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
sink->reset(vmysql_tbl_sink);
} else {
// TODO: figure out good buffer size based on size of output row
@@ -156,8 +158,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
- OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
- sink->reset(odbc_tbl_sink);
+ if (is_vec) {
+ sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
+ } else {
+ sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
+ }
break;
}
diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp
index fe5f05310f..2fd296dfee 100644
--- a/be/src/exec/odbc_connector.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -26,6 +26,9 @@
#include "exprs/expr.h"
#include "runtime/primitive_type.h"
#include "util/types.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
#define ODBC_DISPOSE(h, ht, x, op) \
{ \
@@ -397,4 +400,113 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI
return diagnostic_msg;
}
+Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block,
+ const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
+ uint32_t start_send_row, uint32_t* num_rows_sent) {
+ _insert_stmt_buffer.clear();
+ std::u16string insert_stmt;
+ {
+ SCOPED_TIMER(_convert_tuple_timer);
+ fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
+
+ int num_rows = block->rows();
+ int num_columns = block->columns();
+ for (int i = start_send_row; i < num_rows; ++i) {
+ (*num_rows_sent)++;
+
+ // Construct insert statement of odbc table
+ for (int j = 0; j < num_columns; ++j) {
+ if (j != 0) {
+ fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+ }
+ auto [item, size] = block->get_by_position(j).column->get_data_at(i);
+ if (item == nullptr) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+ continue;
+ }
+ switch (_output_vexpr_ctxs[j]->root()->type().type) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const int8_t*>(item));
+ break;
+ case TYPE_SMALLINT:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const int16_t*>(item));
+ break;
+ case TYPE_INT:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const int32_t*>(item));
+ break;
+ case TYPE_BIGINT:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const int64_t*>(item));
+ break;
+ case TYPE_FLOAT:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const float*>(item));
+ break;
+ case TYPE_DOUBLE:
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const double*>(item));
+ break;
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ vectorized::VecDateTimeValue value =
+ binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
+ *(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ std::string_view str(buf, pos - buf - 1);
+ fmt::format_to(_insert_stmt_buffer, "'{}'", str);
+ break;
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ DecimalV2Value value = *(DecimalV2Value*)(item);
+ fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
+ break;
+ }
+ case TYPE_LARGEINT: {
+ fmt::format_to(_insert_stmt_buffer, "{}",
+ *reinterpret_cast<const __int128*>(item));
+ break;
+ }
+ default: {
+ return Status::InternalError("can't convert this type to mysql type. type = {}",
+ _output_expr_ctxs[j]->root()->type().type);
+ }
+ }
+ }
+
+ if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "),(");
+ } else {
+ // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
+ fmt::format_to(_insert_stmt_buffer, "{}", ")");
+ break;
+ }
+ }
+ // Translate utf8 string to utf16 to use unicode encodeing
+ insert_stmt = utf8_to_wstring(
+ std::string(_insert_stmt_buffer.data(),
+ _insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
+ }
+
+ {
+ SCOPED_TIMER(_result_send_timer);
+ ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
+ SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
+ _insert_stmt_buffer.data());
+ }
+ COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index 690289c47b..77066e3f44 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -34,6 +34,10 @@
namespace doris {
+namespace vectorized {
+class VExprContext;
+}
+
struct ODBCConnectorParam {
std::string connect_string;
@@ -74,7 +78,11 @@ public:
// write for ODBC table
Status init_to_write(RuntimeProfile* profile);
Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row,
- uint32_t* num_row_sent);
+ uint32_t* num_rows_sent);
+
+ Status append(const std::string& table_name, vectorized::Block* block,
+ const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
+ uint32_t start_send_row, uint32_t* num_rows_sent);
// use in ODBC transaction
Status begin_trans(); // should be call after connect and before query or init_to_write
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp
index 8804194b44..752f58d17d 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -58,7 +58,7 @@ Status OdbcTableSink::prepare(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
std::stringstream title;
- title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
+ title << _name << " (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index afc95e77a9..5a81b42ccd 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -173,6 +173,7 @@ set(VEC_FILES
sink/vtablet_sink.cpp
sink/vmysql_table_writer.cpp
sink/vmysql_table_sink.cpp
+ sink/vodbc_table_sink.cpp
sink/vresult_file_sink.cpp
runtime/vdatetime_value.cpp
runtime/vdata_stream_recvr.cpp
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp
similarity index 56%
copy from be/src/runtime/odbc_table_sink.cpp
copy to be/src/vec/sink/vodbc_table_sink.cpp
index 8804194b44..c92245ec5c 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/vec/sink/vodbc_table_sink.cpp
@@ -15,58 +15,57 @@
// specific language governing permissions and limitations
// under the License.
-#include "runtime/odbc_table_sink.h"
+#include "vec/sink/vodbc_table_sink.h"
#include <sstream>
-#include "exprs/expr.h"
-#include "runtime/runtime_state.h"
#include "runtime/mem_tracker.h"
-#include "util/runtime_profile.h"
+#include "runtime/runtime_state.h"
#include "util/debug_util.h"
+#include "util/runtime_profile.h"
+#include "vec/core/materialize_block.h"
+#include "vec/exprs/vexpr.h"
namespace doris {
+namespace vectorized {
-OdbcTableSink::OdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+VOdbcTableSink::VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_exprs)
: _pool(pool),
_row_desc(row_desc),
_t_output_expr(t_exprs),
- _mem_tracker(MemTracker::CreateTracker(-1, "OdbcTableSink")) {
- _name = "OOBC_TABLE_SINK";
+ _mem_tracker(MemTracker::create_tracker(-1, "VOdbcTableSink")) {
+ _name = "VOdbcTableSink";
}
-OdbcTableSink::~OdbcTableSink() = default;
-
-Status OdbcTableSink::init(const TDataSink& t_sink) {
+Status VOdbcTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(DataSink::init(t_sink));
- // From the thrift expressions create the real exprs.
- RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
-
const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;
_odbc_param.connect_string = t_odbc_sink.connect_string;
- _odbc_param.output_expr_ctxs = _output_expr_ctxs;
_odbc_tbl = t_odbc_sink.table;
_use_transaction = t_odbc_sink.use_transaction;
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
return Status::OK();
}
-Status OdbcTableSink::prepare(RuntimeState* state) {
+Status VOdbcTableSink::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
// Prepare the exprs to run.
- RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
+ RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
std::stringstream title;
- title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
+ title << "VOdbcTableSink (frag_id=" << state->fragment_instance_id() << ")";
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
return Status::OK();
}
-Status OdbcTableSink::open(RuntimeState* state) {
+Status VOdbcTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
- RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
+ RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));
+
// create writer
_writer.reset(new ODBCConnector(_odbc_param));
RETURN_IF_ERROR(_writer->open());
@@ -77,30 +76,37 @@ Status OdbcTableSink::open(RuntimeState* state) {
return Status::OK();
}
-Status OdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
- if (batch == nullptr || batch->num_rows() == 0) {
- return Status::OK();
+Status VOdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
+ return Status::NotSupported(
+ "Not Implemented VOdbcTableSink::send(RuntimeState* state, RowBatch* batch)");
+}
+
+Status VOdbcTableSink::send(RuntimeState* state, Block* block) {
+ Status status = Status::OK();
+ if (block == nullptr || block->rows() == 0) {
+ return status;
}
+
+ auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+ _output_expr_ctxs, *block, status);
+ materialize_block_inplace(output_block);
+
uint32_t start_send_row = 0;
uint32_t num_row_sent = 0;
- while (start_send_row < batch->num_rows()) {
- auto status = _writer->append(_odbc_tbl, batch, start_send_row, &num_row_sent);
+ while (start_send_row < output_block.rows()) {
+ status = _writer->append(_odbc_tbl, &output_block, _output_expr_ctxs, start_send_row,
+ &num_row_sent);
if (UNLIKELY(!status.ok())) return status;
start_send_row += num_row_sent;
num_row_sent = 0;
}
- return Status::OK();
-}
-Status OdbcTableSink::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
- return Status::OK();
- }
- Expr::close(_output_expr_ctxs, state);
- if (exec_status.ok() && _use_transaction) {
- RETURN_IF_ERROR(_writer->finish_trans());
- }
- return DataSink::close(state, exec_status);
+ return Status::OK();
}
+Status VOdbcTableSink::close(RuntimeState* state, Status exec_status) {
+ VExpr::close(_output_expr_ctxs, state);
+ return Status::OK();
}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vodbc_table_sink.h b/be/src/vec/sink/vodbc_table_sink.h
new file mode 100644
index 0000000000..75c5348327
--- /dev/null
+++ b/be/src/vec/sink/vodbc_table_sink.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/odbc_connector.h"
+#include "vec/sink/vmysql_table_writer.h"
+
+namespace doris {
+
+class RowDescriptor;
+class TExpr;
+class RuntimeState;
+class RuntimeProfile;
+class MemTracker;
+namespace vectorized {
+
+// This class is a sinker, which put input data to odbc table
+class VOdbcTableSink : public DataSink {
+public:
+ VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_exprs);
+
+ Status init(const TDataSink& thrift_sink) override;
+
+ Status prepare(RuntimeState* state) override;
+
+ Status open(RuntimeState* state) override;
+
+ Status send(RuntimeState* state, RowBatch* batch) override;
+
+ Status send(RuntimeState* state, vectorized::Block* block) override;
+ // Flush all buffered data and close all existing channels to destination
+ // hosts. Further send() calls are illegal after calling close().
+ Status close(RuntimeState* state, Status exec_status) override;
+
+ RuntimeProfile* profile() override { return _profile; }
+
+private:
+ // owned by RuntimeState
+ ObjectPool* _pool;
+ const RowDescriptor& _row_desc;
+ const std::vector<TExpr>& _t_output_expr;
+
+ std::vector<VExprContext*> _output_expr_ctxs;
+
+ RuntimeProfile* _profile;
+ std::shared_ptr<MemTracker> _mem_tracker;
+
+ ODBCConnectorParam _odbc_param;
+ std::string _odbc_tbl;
+ std::unique_ptr<ODBCConnector> _writer;
+ // whether use transaction
+ bool _use_transaction;
+};
+} // namespace vectorized
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org