You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/20 23:07:50 UTC

[doris] branch dev-1.1.1 updated: [cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new 53782c616b [cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)
53782c616b is described below

commit 53782c616b0d560cc4bbc6f3c9b1294a043e6965
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Jul 21 07:07:45 2022 +0800

    [cherry-pick][Vectorized] Support ODBC sink for vec exec engine (#11045) (#11047)
---
 be/src/exec/data_sink.cpp                          |  17 ++--
 be/src/exec/odbc_connector.cpp                     | 112 +++++++++++++++++++++
 be/src/exec/odbc_connector.h                       |  10 +-
 be/src/runtime/odbc_table_sink.cpp                 |   2 +-
 be/src/vec/CMakeLists.txt                          |   1 +
 .../sink/vodbc_table_sink.cpp}                     |  76 +++++++-------
 be/src/vec/sink/vodbc_table_sink.h                 |  72 +++++++++++++
 7 files changed, 247 insertions(+), 43 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index a11556310a..ca6f704a38 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -34,13 +34,13 @@
 #include "runtime/result_file_sink.h"
 #include "runtime/result_sink.h"
 #include "runtime/runtime_state.h"
-
 #include "vec/sink/result_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
+#include "vec/sink/vmysql_table_sink.h"
 #include "vec/sink/vmysql_table_writer.h"
+#include "vec/sink/vodbc_table_sink.h"
 #include "vec/sink/vresult_file_sink.h"
 #include "vec/sink/vtablet_sink.h"
-#include "vec/sink/vmysql_table_sink.h"
 
 namespace doris {
 
@@ -81,7 +81,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
 
         // TODO: figure out good buffer size based on size of output row
         if (is_vec) {
-            tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs, thrift_sink.result_sink, 4096);
+            tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
+                                                          thrift_sink.result_sink, 4096);
         } else {
             tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
         }
@@ -139,7 +140,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
         if (is_vec) {
-            doris::vectorized::VMysqlTableSink* vmysql_tbl_sink = new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
+            doris::vectorized::VMysqlTableSink* vmysql_tbl_sink =
+                    new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
             sink->reset(vmysql_tbl_sink);
         } else {
             // TODO: figure out good buffer size based on size of output row
@@ -156,8 +158,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         if (!thrift_sink.__isset.odbc_table_sink) {
             return Status::InternalError("Missing data odbc sink.");
         }
-        OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
-        sink->reset(odbc_tbl_sink);
+        if (is_vec) {
+            sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
+        } else {
+            sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
+        }
         break;
     }
 
diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp
index fe5f05310f..2fd296dfee 100644
--- a/be/src/exec/odbc_connector.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -26,6 +26,9 @@
 #include "exprs/expr.h"
 #include "runtime/primitive_type.h"
 #include "util/types.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 #define ODBC_DISPOSE(h, ht, x, op)                                                        \
     {                                                                                     \
@@ -397,4 +400,113 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI
     return diagnostic_msg;
 }
 
+Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block,
+                             const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
+                             uint32_t start_send_row, uint32_t* num_rows_sent) {
+    _insert_stmt_buffer.clear();
+    std::u16string insert_stmt;
+    {
+        SCOPED_TIMER(_convert_tuple_timer);
+        fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
+
+        int num_rows = block->rows();
+        int num_columns = block->columns();
+        for (int i = start_send_row; i < num_rows; ++i) {
+            (*num_rows_sent)++;
+
+            // Construct insert statement of odbc table
+            for (int j = 0; j < num_columns; ++j) {
+                if (j != 0) {
+                    fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+                }
+                auto [item, size] = block->get_by_position(j).column->get_data_at(i);
+                if (item == nullptr) {
+                    fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+                    continue;
+                }
+                switch (_output_vexpr_ctxs[j]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const int8_t*>(item));
+                    break;
+                case TYPE_SMALLINT:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const int16_t*>(item));
+                    break;
+                case TYPE_INT:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const int32_t*>(item));
+                    break;
+                case TYPE_BIGINT:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const int64_t*>(item));
+                    break;
+                case TYPE_FLOAT:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const float*>(item));
+                    break;
+                case TYPE_DOUBLE:
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const double*>(item));
+                    break;
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    vectorized::VecDateTimeValue value =
+                            binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
+                                    *(int64_t*)item);
+
+                    char buf[64];
+                    char* pos = value.to_string(buf);
+                    std::string_view str(buf, pos - buf - 1);
+                    fmt::format_to(_insert_stmt_buffer, "'{}'", str);
+                    break;
+                }
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    DecimalV2Value value = *(DecimalV2Value*)(item);
+                    fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
+                    break;
+                }
+                case TYPE_LARGEINT: {
+                    fmt::format_to(_insert_stmt_buffer, "{}",
+                                   *reinterpret_cast<const __int128*>(item));
+                    break;
+                }
+                default: {
+                    return Status::InternalError("can't convert this type to mysql type. type = {}",
+                                                 _output_expr_ctxs[j]->root()->type().type);
+                }
+                }
+            }
+
+            if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
+                fmt::format_to(_insert_stmt_buffer, "{}", "),(");
+            } else {
+                // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
+                fmt::format_to(_insert_stmt_buffer, "{}", ")");
+                break;
+            }
+        }
+        // Translate utf8 string to utf16 to use unicode encodeing
+        insert_stmt = utf8_to_wstring(
+                std::string(_insert_stmt_buffer.data(),
+                            _insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
+    }
+
+    {
+        SCOPED_TIMER(_result_send_timer);
+        ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
+                     SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
+                     _insert_stmt_buffer.data());
+    }
+    COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index 690289c47b..77066e3f44 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -34,6 +34,10 @@
 
 namespace doris {
 
+namespace vectorized {
+class VExprContext;
+}
+
 struct ODBCConnectorParam {
     std::string connect_string;
 
@@ -74,7 +78,11 @@ public:
     // write for ODBC table
     Status init_to_write(RuntimeProfile* profile);
     Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row,
-                  uint32_t* num_row_sent);
+                  uint32_t* num_rows_sent);
+
+    Status append(const std::string& table_name, vectorized::Block* block,
+                  const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
+                  uint32_t start_send_row, uint32_t* num_rows_sent);
 
     // use in ODBC transaction
     Status begin_trans();  // should be call after connect and before query or init_to_write
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp
index 8804194b44..752f58d17d 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -58,7 +58,7 @@ Status OdbcTableSink::prepare(RuntimeState* state) {
     // Prepare the exprs to run.
     RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
     std::stringstream title;
-    title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
+    title << _name << " (frag_id=" << state->fragment_instance_id() << ")";
     // create profile
     _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
     return Status::OK();
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index afc95e77a9..5a81b42ccd 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -173,6 +173,7 @@ set(VEC_FILES
   sink/vtablet_sink.cpp
   sink/vmysql_table_writer.cpp
   sink/vmysql_table_sink.cpp
+  sink/vodbc_table_sink.cpp
   sink/vresult_file_sink.cpp
   runtime/vdatetime_value.cpp
   runtime/vdata_stream_recvr.cpp
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp
similarity index 56%
copy from be/src/runtime/odbc_table_sink.cpp
copy to be/src/vec/sink/vodbc_table_sink.cpp
index 8804194b44..c92245ec5c 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/vec/sink/vodbc_table_sink.cpp
@@ -15,58 +15,57 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/odbc_table_sink.h"
+#include "vec/sink/vodbc_table_sink.h"
 
 #include <sstream>
 
-#include "exprs/expr.h"
-#include "runtime/runtime_state.h"
 #include "runtime/mem_tracker.h"
-#include "util/runtime_profile.h"
+#include "runtime/runtime_state.h"
 #include "util/debug_util.h"
+#include "util/runtime_profile.h"
+#include "vec/core/materialize_block.h"
+#include "vec/exprs/vexpr.h"
 
 namespace doris {
+namespace vectorized {
 
-OdbcTableSink::OdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+VOdbcTableSink::VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& t_exprs)
         : _pool(pool),
           _row_desc(row_desc),
           _t_output_expr(t_exprs),
-          _mem_tracker(MemTracker::CreateTracker(-1, "OdbcTableSink")) {
-    _name = "OOBC_TABLE_SINK";
+          _mem_tracker(MemTracker::create_tracker(-1, "VOdbcTableSink")) {
+    _name = "VOdbcTableSink";
 }
 
-OdbcTableSink::~OdbcTableSink() = default;
-
-Status OdbcTableSink::init(const TDataSink& t_sink) {
+Status VOdbcTableSink::init(const TDataSink& t_sink) {
     RETURN_IF_ERROR(DataSink::init(t_sink));
-    // From the thrift expressions create the real exprs.
-    RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
-
     const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;
 
     _odbc_param.connect_string = t_odbc_sink.connect_string;
-    _odbc_param.output_expr_ctxs = _output_expr_ctxs;
     _odbc_tbl = t_odbc_sink.table;
     _use_transaction = t_odbc_sink.use_transaction;
 
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
     return Status::OK();
 }
 
-Status OdbcTableSink::prepare(RuntimeState* state) {
+Status VOdbcTableSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
     // Prepare the exprs to run.
-    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
+    RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
     std::stringstream title;
-    title << "ODBC_TABLE_SINK (frag_id=" << state->fragment_instance_id() << ")";
+    title << "VOdbcTableSink (frag_id=" << state->fragment_instance_id() << ")";
     // create profile
     _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
     return Status::OK();
 }
 
-Status OdbcTableSink::open(RuntimeState* state) {
+Status VOdbcTableSink::open(RuntimeState* state) {
     // Prepare the exprs to run.
-    RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
+    RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state));
+
     // create writer
     _writer.reset(new ODBCConnector(_odbc_param));
     RETURN_IF_ERROR(_writer->open());
@@ -77,30 +76,37 @@ Status OdbcTableSink::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status OdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
-    if (batch == nullptr || batch->num_rows() == 0) {
-        return Status::OK();
+Status VOdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
+    return Status::NotSupported(
+            "Not Implemented VOdbcTableSink::send(RuntimeState* state, RowBatch* batch)");
+}
+
+Status VOdbcTableSink::send(RuntimeState* state, Block* block) {
+    Status status = Status::OK();
+    if (block == nullptr || block->rows() == 0) {
+        return status;
     }
+
+    auto output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+            _output_expr_ctxs, *block, status);
+    materialize_block_inplace(output_block);
+
     uint32_t start_send_row = 0;
     uint32_t num_row_sent = 0;
-    while (start_send_row < batch->num_rows()) {
-        auto status = _writer->append(_odbc_tbl, batch, start_send_row, &num_row_sent);
+    while (start_send_row < output_block.rows()) {
+        status = _writer->append(_odbc_tbl, &output_block, _output_expr_ctxs, start_send_row,
+                                 &num_row_sent);
         if (UNLIKELY(!status.ok())) return status;
         start_send_row += num_row_sent;
         num_row_sent = 0;
     }
-    return Status::OK();
-}
 
-Status OdbcTableSink::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return Status::OK();
-    }
-    Expr::close(_output_expr_ctxs, state);
-    if (exec_status.ok() && _use_transaction) {
-        RETURN_IF_ERROR(_writer->finish_trans());
-    }
-    return DataSink::close(state, exec_status);
+    return Status::OK();
 }
 
+Status VOdbcTableSink::close(RuntimeState* state, Status exec_status) {
+    VExpr::close(_output_expr_ctxs, state);
+    return Status::OK();
 }
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vodbc_table_sink.h b/be/src/vec/sink/vodbc_table_sink.h
new file mode 100644
index 0000000000..75c5348327
--- /dev/null
+++ b/be/src/vec/sink/vodbc_table_sink.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/odbc_connector.h"
+#include "vec/sink/vmysql_table_writer.h"
+
+namespace doris {
+
+class RowDescriptor;
+class TExpr;
+class RuntimeState;
+class RuntimeProfile;
+class MemTracker;
+namespace vectorized {
+
+// This class is a sinker, which put input data to odbc table
+class VOdbcTableSink : public DataSink {
+public:
+    VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                   const std::vector<TExpr>& t_exprs);
+
+    Status init(const TDataSink& thrift_sink) override;
+
+    Status prepare(RuntimeState* state) override;
+
+    Status open(RuntimeState* state) override;
+
+    Status send(RuntimeState* state, RowBatch* batch) override;
+
+    Status send(RuntimeState* state, vectorized::Block* block) override;
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    Status close(RuntimeState* state, Status exec_status) override;
+
+    RuntimeProfile* profile() override { return _profile; }
+
+private:
+    // owned by RuntimeState
+    ObjectPool* _pool;
+    const RowDescriptor& _row_desc;
+    const std::vector<TExpr>& _t_output_expr;
+
+    std::vector<VExprContext*> _output_expr_ctxs;
+
+    RuntimeProfile* _profile;
+    std::shared_ptr<MemTracker> _mem_tracker;
+
+    ODBCConnectorParam _odbc_param;
+    std::string _odbc_tbl;
+    std::unique_ptr<ODBCConnector> _writer;
+    // whether use transaction
+    bool _use_transaction;
+};
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org