You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/13 13:53:41 UTC

[incubator-doris] branch master updated: [ODBC] Support ODBC Sink for insert into data to ODBC external table (#5033)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 115d433  [ODBC] Support ODBC Sink for insert into data to ODBC external table (#5033)
115d433 is described below

commit 115d4332aaf40adc380b0ed8570b0b57dc3896fd
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sun Dec 13 21:53:27 2020 +0800

    [ODBC] Support ODBC Sink for insert into data to ODBC external table (#5033)
    
    issue:#5031
    
    1. Support ODBC Sink for insert into data to ODBC external table.
    2. Support Transaction for ODBC sink to make sure insert into data is atomicital.
    3. The document about ODBC sink has been modified
---
 be/src/exec/CMakeLists.txt                         |   2 +-
 be/src/exec/data_sink.cpp                          |  11 +-
 .../exec/{odbc_scanner.cpp => odbc_connector.cpp}  | 194 ++++++++++++++++++++-
 be/src/exec/{odbc_scanner.h => odbc_connector.h}   |  43 +++--
 be/src/exec/odbc_scan_node.cpp                     |   2 +-
 be/src/exec/odbc_scan_node.h                       |   6 +-
 be/src/runtime/CMakeLists.txt                      |   1 +
 be/src/runtime/odbc_table_sink.cpp                 |  90 ++++++++++
 be/src/runtime/odbc_table_sink.h                   |  81 +++++++++
 docs/en/extending-doris/odbc-of-doris.md           |  20 ++-
 docs/zh-CN/extending-doris/odbc-of-doris.md        |  21 ++-
 .../java/org/apache/doris/catalog/OdbcTable.java   |  14 ++
 .../java/org/apache/doris/planner/DataSink.java    |   3 +
 .../org/apache/doris/planner/OdbcScanNode.java     |  18 +-
 .../org/apache/doris/planner/OdbcTableSink.java    |  74 ++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  10 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  19 ++
 gensrc/thrift/DataSinks.thrift                     |  10 +-
 18 files changed, 574 insertions(+), 45 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 3e093e3..6ba785e 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -101,7 +101,7 @@ set(EXEC_FILES
     parquet_reader.cpp
     parquet_writer.cpp
     orc_scanner.cpp
-    odbc_scanner.cpp
+    odbc_connector.cpp
     json_scanner.cpp
     assert_num_rows_node.cpp
 )
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 47bea39..218e29f 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -31,6 +31,7 @@
 #include "runtime/export_sink.h"
 #include "runtime/memory_scratch_sink.h"
 #include "runtime/mysql_table_sink.h"
+#include "runtime/odbc_table_sink.h"
 #include "runtime/result_sink.h"
 #include "runtime/runtime_state.h"
 #include "util/logging.h"
@@ -93,7 +94,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                 "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
 #endif
     }
-
+    case TDataSinkType::ODBC_TABLE_SINK: {
+        if (!thrift_sink.__isset.odbc_table_sink) {
+            return Status::InternalError("Missing data odbc sink.");
+        }
+        OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool,
+                                                         row_desc, output_exprs);
+        sink->reset(odbc_tbl_sink);
+        break;
+    }
     case TDataSinkType::DATA_SPLIT_SINK: {
         if (!thrift_sink.__isset.split_sink) {
             return Status::InternalError("Missing data split buffer sink.");
diff --git a/be/src/exec/odbc_scanner.cpp b/be/src/exec/odbc_connector.cpp
similarity index 54%
rename from be/src/exec/odbc_scanner.cpp
rename to be/src/exec/odbc_connector.cpp
index 1df47ad..32a0079 100644
--- a/be/src/exec/odbc_scanner.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/odbc_scanner.h"
+#include "exec/odbc_connector.h"
 
 #include <sqlext.h>
 
@@ -23,7 +23,9 @@
 #include <codecvt>
 
 #include "common/logging.h"
+#include "exprs/expr.h"
 #include "runtime/primitive_type.h"
+#include "util/types.h"
 
 #define ODBC_DISPOSE(h, ht, x, op)                                        \
     {                                                                     \
@@ -48,10 +50,11 @@ static std::u16string utf8_to_wstring(const std::string& str) {
 
 namespace doris {
 
-ODBCScanner::ODBCScanner(const ODBCScannerParam& param)
+ODBCConnector::ODBCConnector(const ODBCConnectorParam& param)
         : _connect_string(param.connect_string),
           _sql_str(param.query_string),
           _tuple_desc(param.tuple_desc),
+          _output_expr_ctxs(std::move(param.output_expr_ctxs)),
           _is_open(false),
           _field_num(0),
           _row_count(0),
@@ -59,7 +62,12 @@ ODBCScanner::ODBCScanner(const ODBCScannerParam& param)
           _dbc(nullptr),
           _stmt(nullptr) {}
 
-ODBCScanner::~ODBCScanner() {
+ODBCConnector::~ODBCConnector() {
+    // do not commit transaction, roll back
+    if (_is_in_transaction) {
+        abort_trans();
+    }
+
     if (_stmt != nullptr) {
         SQLFreeHandle(SQL_HANDLE_STMT, _stmt);
     }
@@ -74,7 +82,7 @@ ODBCScanner::~ODBCScanner() {
     }
 }
 
-Status ODBCScanner::open() {
+Status ODBCConnector::open() {
     if (_is_open) {
         LOG(INFO) << "this scanner already opened";
         return Status::OK();
@@ -102,7 +110,7 @@ Status ODBCScanner::open() {
     return Status::OK();
 }
 
-Status ODBCScanner::query() {
+Status ODBCConnector::query() {
     if (!_is_open) {
         return Status::InternalError("Query before open.");
     }
@@ -111,7 +119,7 @@ Status ODBCScanner::query() {
     ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt),
                  "alloc statement");
 
-    // Translate utf8 string to utf16 to use unicode code
+    // Translate utf8 string to utf16 to use unicode encoding
     auto wquery = utf8_to_wstring(_sql_str);
     ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
                  SQLExecDirectW(_stmt, (SQLWCHAR*)(wquery.c_str()), SQL_NTS), "exec direct");
@@ -156,7 +164,7 @@ Status ODBCScanner::query() {
     return Status::OK();
 }
 
-Status ODBCScanner::get_next_row(bool* eos) {
+Status ODBCConnector::get_next_row(bool* eos) {
     if (!_is_open) {
         return Status::InternalError("GetNextRow before open.");
     }
@@ -172,7 +180,175 @@ Status ODBCScanner::get_next_row(bool* eos) {
     return Status::OK();
 }
 
-Status ODBCScanner::error_status(const std::string& prefix, const std::string& error_msg) {
+Status ODBCConnector::init_to_write() {
+    if (!_is_open) {
+        return Status::InternalError( "Init before open.");
+    }
+
+    // Allocate a statement handle
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
+
+    return Status::OK();
+}
+
+Status ODBCConnector::append(const std::string& table_name, RowBatch *batch) {
+    if (batch == nullptr || batch->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    int num_rows = batch->num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(insert_row(table_name, batch->get_row(i)));
+    }
+
+    return Status::OK();
+}
+
+Status ODBCConnector::insert_row(const std::string& table_name, TupleRow *row) {
+    std::stringstream ss;
+
+    // Construct Insert statement of odbc table
+    ss << "INSERT INTO " << table_name << " VALUES (";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        if (i != 0) {
+            ss << ", ";
+        }
+        void* item = _output_expr_ctxs[i]->get_value(row);
+        if (item == nullptr) {
+            ss << "NULL";
+            continue;
+        }
+        switch (_output_expr_ctxs[i]->root()->type().type) {
+            case TYPE_BOOLEAN:
+            case TYPE_TINYINT:
+                ss << (int)*static_cast<int8_t*>(item);
+                break;
+            case TYPE_SMALLINT:
+                ss << *static_cast<int16_t*>(item);
+                break;
+            case TYPE_INT:
+                ss << *static_cast<int32_t*>(item);
+                break;
+            case TYPE_BIGINT:
+                ss << *static_cast<int64_t*>(item);
+                break;
+            case TYPE_FLOAT:
+                ss << *static_cast<float*>(item);
+                break;
+            case TYPE_DOUBLE:
+                ss << *static_cast<double*>(item);
+                break;
+            case TYPE_DATE:
+            case TYPE_DATETIME: {
+                char buf[64];
+                const DateTimeValue* time_val = (const DateTimeValue*)(item);
+                time_val->to_string(buf);
+                ss << "\'" << buf << "\'";
+                break;
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR: {
+                const StringValue* string_val = (const StringValue*)(item);
+
+                if (string_val->ptr == NULL) {
+                    if (string_val->len == 0) {
+                        ss << "\'\'";
+                    } else {
+                        ss << "NULL";
+                    }
+                } else {
+                    ss << "\'";
+                    for (int j = 0; j < string_val->len ; ++j) {
+                        ss << string_val->ptr[j];
+                    }
+                    ss << "\'";
+                }
+                break;
+            }
+            case TYPE_DECIMAL: {
+                const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item);
+                std::string decimal_str;
+                int output_scale = _output_expr_ctxs[i]->root()->output_scale();
+
+                if (output_scale > 0 && output_scale <= 30) {
+                    decimal_str = decimal_val->to_string(output_scale);
+                } else {
+                    decimal_str = decimal_val->to_string();
+                }
+                ss << decimal_str;
+                break;
+            }
+            case TYPE_DECIMALV2: {
+                const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value);
+                std::string decimal_str;
+                int output_scale = _output_expr_ctxs[i]->root()->output_scale();
+
+                if (output_scale > 0 && output_scale <= 30) {
+                    decimal_str = decimal_val.to_string(output_scale);
+                } else {
+                    decimal_str = decimal_val.to_string();
+                }
+                ss << decimal_str;
+                break;
+            }
+
+            default: {
+                std::stringstream err_ss;
+                err_ss << "can't convert this type to mysql type. type = " <<
+                       _output_expr_ctxs[i]->root()->type();
+                return Status::InternalError(err_ss.str());
+            }
+        }
+    }
+    ss << ")";
+
+    // Translate utf8 string to utf16 to use unicode encodeing
+    auto insert_stmt = utf8_to_wstring(ss.str());
+    ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT, SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS), ss.str().c_str());
+
+    return Status::OK();
+}
+
+Status ODBCConnector::begin_trans() {
+    if (!_is_open) {
+        return Status::InternalError("Begin transaction before open.");
+    }
+
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
+                 SQLSetConnectAttr(_dbc, SQL_ATTR_AUTOCOMMIT, (SQLPOINTER)SQL_AUTOCOMMIT_OFF, SQL_IS_UINTEGER),
+                 "Begin transcation");
+    _is_in_transaction = true;
+
+    return Status::OK();
+}
+
+Status ODBCConnector::abort_trans() {
+    if (!_is_in_transaction) {
+        return Status::InternalError("Abort transaction before begin trans.");
+    }
+
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
+                 SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_ROLLBACK),
+                 "Abort transcation");
+
+    return Status::OK();
+}
+
+Status ODBCConnector::finish_trans() {
+    if (!_is_in_transaction) {
+        return Status::InternalError("Abort transaction before begin trans.");
+    }
+
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
+                 SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_COMMIT),
+                 "commit transcation");
+    _is_in_transaction = false;
+
+    return Status::OK();
+}
+
+Status ODBCConnector::error_status(const std::string& prefix, const std::string& error_msg) {
     std::stringstream msg;
     msg << prefix << " Error: " << error_msg;
     LOG(WARNING) << msg.str();
@@ -185,7 +361,7 @@ Status ODBCScanner::error_status(const std::string& prefix, const std::string& e
 //      hHandle     ODBC handle
 //      hType       Type of handle (HANDLE_STMT, HANDLE_ENV, HANDLE_DBC)
 //      RetCode     Return code of failing command
-std::string ODBCScanner::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLINT hType,
+std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLINT hType,
                                                   RETCODE RetCode) {
     SQLSMALLINT rec = 0;
     SQLINTEGER error;
diff --git a/be/src/exec/odbc_scanner.h b/be/src/exec/odbc_connector.h
similarity index 68%
rename from be/src/exec/odbc_scanner.h
rename to be/src/exec/odbc_connector.h
index e384bca..1e92022 100644
--- a/be/src/exec/odbc_scanner.h
+++ b/be/src/exec/odbc_connector.h
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_QUERY_EXEC_ODBC_SCANNER_H
-#define DORIS_BE_SRC_QUERY_EXEC_ODBC_SCANNER_H
+#ifndef DORIS_BE_SRC_QUERY_EXEC_ODBC_CONNECTOR_H
+#define DORIS_BE_SRC_QUERY_EXEC_ODBC_CONNECTOR_H
 
 #include <sql.h>
 
@@ -26,17 +26,23 @@
 #include <string>
 #include <vector>
 
+#include "exprs/expr_context.h"
+#include "runtime/row_batch.h"
 #include "common/status.h"
 #include "gen_cpp/Types_types.h"
 #include "runtime/descriptors.h"
 
 namespace doris {
 
-struct ODBCScannerParam {
+struct ODBCConnectorParam {
     std::string connect_string;
-    std::string query_string;
 
+    // only use in query
+    std::string query_string;
     const TupleDescriptor* tuple_desc;
+
+    // only use in write
+    std::vector<ExprContext*> output_expr_ctxs;
 };
 
 // Because the DataBinding have the mem alloc, so
@@ -52,32 +58,47 @@ struct DataBinding : public boost::noncopyable {
     ~DataBinding() { free(target_value_ptr); }
 };
 
-// ODBC Scanner for scan data from ODBC
-class ODBCScanner {
+// ODBC Connector for scan data from ODBC
+class ODBCConnector {
 public:
-    ODBCScanner(const ODBCScannerParam& param);
-    ~ODBCScanner();
+    ODBCConnector(const ODBCConnectorParam& param);
+    ~ODBCConnector();
 
     Status open();
-
     // query for ODBC table
     Status query();
-
     Status get_next_row(bool* eos);
 
-    const DataBinding& get_column_data(int i) const { return _columns_data.at(i); }
+    // write for ODBC table
+    Status init_to_write();
+    Status append(const std::string& table_name, RowBatch* batch);
 
+    // use in ODBC transaction
+    Status begin_trans();  // should be call after connect and before query or init_to_write
+    Status abort_trans();  // should be call after transaction abort
+    Status finish_trans(); // should be call after transaction commit
+
+    const DataBinding& get_column_data(int i) const { return _columns_data.at(i); }
 private:
+    Status insert_row(const string& table_name, TupleRow* row);
+
     static Status error_status(const std::string& prefix, const std::string& error_msg);
 
     static std::string handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLINT hType,
                                                 RETCODE RetCode);
 
     std::string _connect_string;
+    // only use in query
     std::string _sql_str;
     const TupleDescriptor* _tuple_desc;
 
+    // only use in write
+    const std::vector<ExprContext*> _output_expr_ctxs;
+
     bool _is_open;
+    bool _is_in_transaction;
+
+
     SQLSMALLINT _field_num;
     uint64_t _row_count;
 
diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp
index ce4d71b..b1d86fd 100644
--- a/be/src/exec/odbc_scan_node.cpp
+++ b/be/src/exec/odbc_scan_node.cpp
@@ -65,7 +65,7 @@ Status OdbcScanNode::prepare(RuntimeState* state) {
     _odbc_param.query_string = std::move(_query_string);
     _odbc_param.tuple_desc = _tuple_desc;
 
-    _odbc_scanner.reset(new (std::nothrow) ODBCScanner(_odbc_param));
+    _odbc_scanner.reset(new (std::nothrow) ODBCConnector(_odbc_param));
 
     if (_odbc_scanner.get() == nullptr) {
         return Status::InternalError("new a odbc scanner failed.");
diff --git a/be/src/exec/odbc_scan_node.h b/be/src/exec/odbc_scan_node.h
index 5763b44..393d9ac 100644
--- a/be/src/exec/odbc_scan_node.h
+++ b/be/src/exec/odbc_scan_node.h
@@ -20,7 +20,7 @@
 
 #include <memory>
 
-#include "exec/odbc_scanner.h"
+#include "exec/odbc_connector.h"
 #include "exec/scan_node.h"
 #include "runtime/descriptors.h"
 
@@ -82,8 +82,8 @@ private:
     std::unique_ptr<MemPool> _tuple_pool;
 
     // Scanner of ODBC.
-    std::unique_ptr<ODBCScanner> _odbc_scanner;
-    ODBCScannerParam _odbc_param;
+    std::unique_ptr<ODBCConnector> _odbc_scanner;
+    ODBCConnectorParam _odbc_param;
     // Helper class for converting text to other types;
     std::unique_ptr<TextConverter> _text_converter;
     // Current tuple.
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0176d74..db32d1a 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -108,6 +108,7 @@ set(RUNTIME_FILES
     memory/chunk_allocator.cpp
     cache/result_node.cpp
     cache/result_cache.cpp
+    odbc_table_sink.cpp	
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp
new file mode 100644
index 0000000..e02d978
--- /dev/null
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/odbc_table_sink.h"
+
+#include <sstream>
+
+#include "exprs/expr.h"
+#include "runtime/runtime_state.h"
+#include "runtime/mem_tracker.h"
+#include "util/runtime_profile.h"
+#include "util/debug_util.h"
+
+namespace doris {
+
+OdbcTableSink::OdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                               const std::vector<TExpr>& t_exprs)
+        : _pool(pool),
+          _row_desc(row_desc),
+          _t_output_expr(t_exprs),
+          _mem_tracker(MemTracker::CreateTracker(-1, "OdbcTableSink")) {}
+
+OdbcTableSink::~OdbcTableSink() {}
+
+Status OdbcTableSink::init(const TDataSink& t_sink) {
+    RETURN_IF_ERROR(DataSink::init(t_sink));
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
+
+    const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;
+
+    _odbc_param.connect_string = t_odbc_sink.connect_string;
+    _odbc_param.output_expr_ctxs = _output_expr_ctxs;
+    _odbc_tbl = t_odbc_sink.table;
+    _use_transaction = t_odbc_sink.use_transaction;
+
+    return Status::OK();
+}
+
+Status OdbcTableSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
+    std::stringstream title;
+    title << "OdbcTableSink (frag_id=" << state->fragment_instance_id() << ")";
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+    return Status::OK();
+}
+
+Status OdbcTableSink::open(RuntimeState* state) {
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
+    // create writer
+    _writer.reset(new ODBCConnector(_odbc_param));
+    RETURN_IF_ERROR(_writer->open());
+    if (_use_transaction) {
+        RETURN_IF_ERROR(_writer->begin_trans());
+    }
+    RETURN_IF_ERROR(_writer->init_to_write());
+    return Status::OK();
+}
+
+Status OdbcTableSink::send(RuntimeState* state, RowBatch* batch) {
+    return _writer->append(_odbc_tbl, batch);
+}
+
+Status OdbcTableSink::close(RuntimeState* state, Status exec_status) {
+    Expr::close(_output_expr_ctxs, state);
+    if (exec_status.ok() && _use_transaction) {
+        RETURN_IF_ERROR(_writer->finish_trans());
+    }
+    return Status::OK();
+}
+
+}
diff --git a/be/src/runtime/odbc_table_sink.h b/be/src/runtime/odbc_table_sink.h
new file mode 100644
index 0000000..385075b
--- /dev/null
+++ b/be/src/runtime/odbc_table_sink.h
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_RUNTIME_ODBC_TABLE_SINK_H
+#define DORIS_BE_RUNTIME_ODBC_TABLE_SINK_H
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/odbc_connector.h"
+
+namespace doris {
+
+class RowDescriptor;
+class TExpr;
+class TOdbcTableSink;
+class RuntimeState;
+class RuntimeProfile;
+class ExprContext;
+class MemTracker;
+
+//This class is a sinker, which put input data to odbc table
+class OdbcTableSink : public DataSink {
+public:
+    OdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                   const std::vector<TExpr>& t_exprs);
+
+    virtual ~OdbcTableSink();
+
+    virtual Status init(const TDataSink& thrift_sink);
+
+    virtual Status prepare(RuntimeState* state);
+
+    virtual Status open(RuntimeState* state);
+
+    // send data in 'batch' to this backend stream mgr
+    // Blocks until all rows in batch are placed in the buffer
+    virtual Status send(RuntimeState* state, RowBatch* batch);
+
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    virtual Status close(RuntimeState* state, Status exec_status);
+
+    virtual RuntimeProfile* profile() {
+        return _profile;
+    }
+
+private:
+    ObjectPool* _pool;
+    const RowDescriptor& _row_desc;
+    const std::vector<TExpr>& _t_output_expr;
+
+    std::vector<ExprContext*> _output_expr_ctxs;
+    ODBCConnectorParam _odbc_param;
+    std::string _odbc_tbl;
+    std::unique_ptr<ODBCConnector> _writer;
+    // whether use transaction
+    bool _use_transaction;
+
+    RuntimeProfile* _profile;
+    std::shared_ptr<MemTracker> _mem_tracker;
+};
+
+}
+
+#endif
diff --git a/docs/en/extending-doris/odbc-of-doris.md b/docs/en/extending-doris/odbc-of-doris.md
index 0f081c4..84f4d8f 100644
--- a/docs/en/extending-doris/odbc-of-doris.md
+++ b/docs/en/extending-doris/odbc-of-doris.md
@@ -31,6 +31,7 @@ ODBC external table of Doris provides Doris access to external tables through th
 
 1. Support various data sources to access Doris
 2. Support Doris query with tables in various data sources to perform more complex analysis operations
+3. Use insert into to write the query results executed by Doris to the external data source
 
 
 This document mainly introduces the implementation principle and usage of this ODBC external table.
@@ -130,13 +131,30 @@ FileUsage       = 1
 
 ### Query usage
 
-After the ODBC external table is built in Doris, it is no different from ordinary Doris tables except that the data model (rollup, pre aggregation, materialized view, etc.) in Doris cannot be used.
+After the ODBC external table is create in Doris, it is no different from ordinary Doris tables except that the data model (rollup, pre aggregation, materialized view, etc.) in Doris cannot be used.
 
 ```
 select * from oracle_table where k1 > 1000 and k3 ='term' or k4 like '%doris'
 ```
 
+### Data write
 
+After the ODBC external table is create in Doris, the data can be written directly by the `insert into` statement, the query results of Doris can be written to the ODBC external table, or the data can be imported from one ODBC table to another.
+
+```
+insert into oracle_table values(1, "doris");
+insert into oracle_table select * from postgre_table;
+```
+#### Transaction
+
+
+The data of Doris is written to the external table by a group of batch. If the import is interrupted, the data written before may need to be rolled back. Therefore, the ODBC external table supports transactions when data is written. Transaction support needs to be supported set by session variable: `enable_odbc_transcation`.
+
+```
+set enable_odbc_transcation = true; 
+```
+
+Transactions ensure the atomicity of ODBC external table writing, but it will reduce the performance of data writing ., so we can consider turning on the way as appropriate.
 
 ## Data type mapping
 
diff --git a/docs/zh-CN/extending-doris/odbc-of-doris.md b/docs/zh-CN/extending-doris/odbc-of-doris.md
index 61f13af..91374db 100644
--- a/docs/zh-CN/extending-doris/odbc-of-doris.md
+++ b/docs/zh-CN/extending-doris/odbc-of-doris.md
@@ -30,6 +30,7 @@ ODBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(
 
  1. 支持各种数据源接入Doris
  2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作
+  3. 通过insert into将Doris执行的查询结果写入外部的数据源
 
 本文档主要介绍该功能的实现原理、使用方式等。
 
@@ -130,9 +131,27 @@ FileUsage       = 1
 
 
 ```
-select * from oracle_table where k1 > 1000 and k3 ='term' or k4 like '%doris'
+select * from oracle_table where k1 > 1000 and k3 ='term' or k4 like '%doris';
 ```
 
+### 数据写入
+
+在Doris中建立ODBC外表后,可以通过insert into语句直接写入数据,也可以将Doris执行完查询之后的结果写入ODBC外表,或者是从一个ODBC外表将数据导入另一个ODBC外表。
+
+
+```
+insert into oracle_table values(1, "doris");
+insert into oracle_table select * from postgre_table;
+```
+#### 事务
+
+Doris的数据是由一组batch的方式写入外部表的,如果中途导入中断,之前写入数据可能需要回滚。所以ODBC外表支持数据写入时的事务,事务的支持需要通过session variable:`enable_odbc_transcation `设置。
+
+```
+set enable_odbc_transcation = true; 
+```
+
+事务保证了ODBC外表数据写入的原子性,但是一定程度上会降低数据写入的性能,可以考虑酌情开启该功能。
 
 
 ## 类型匹配
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
index e2f76d6..a42fbd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
@@ -55,6 +55,7 @@ public class OdbcTable extends Table {
     private static final String ODBC_DRIVER = "driver";
     private static final String ODBC_TYPE = "odbc_type";
 
+    // map now odbc external table Doris support now
     private static Map<String, TOdbcTableType> TABLE_TYPE_MAP;
     static {
         Map<String, TOdbcTableType> tempMap = new HashMap<>();
@@ -64,6 +65,19 @@ public class OdbcTable extends Table {
         TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
     }
 
+    // For different databases, special characters need to be escaped
+    private static String mysqlProperName(String name) {
+        return "`" + name + "`";
+    }
+    
+    public static String databaseProperName(TOdbcTableType tableType, String name) {
+        switch (tableType) {
+            case MYSQL:
+                return mysqlProperName(name);
+        }
+        return name;
+    }
+
     private String odbcCatalogResourceName;
     private String host;
     private String port;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
index 63eb5aa..a4f8970 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.catalog.MysqlTable;
+import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.thrift.TDataSink;
@@ -54,6 +55,8 @@ public abstract class DataSink {
     public static DataSink createDataSink(Table table) throws AnalysisException {
         if (table instanceof MysqlTable) {
             return new MysqlTableSink((MysqlTable) table);
+        } else if (table instanceof OdbcTable) {
+            return new OdbcTableSink((OdbcTable)table);
         } else {
             throw new AnalysisException("Unknown table type " + table.getType());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
index 0d6ce80..7e37729 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcScanNode.java
@@ -50,18 +50,6 @@ import java.util.List;
 public class OdbcScanNode extends ScanNode {
     private static final Logger LOG = LogManager.getLogger(OdbcScanNode.class);
 
-    private static String mysqlProperName(String name) {
-        return "`" + name + "`";
-    }
-
-    private static String databaseProperName(TOdbcTableType tableType, String name) {
-        switch (tableType) {
-            case MYSQL:
-                return mysqlProperName(name);
-        }
-        return name;
-    }
-
     // Now some database have different function call like doris, now doris do not
     // push down the function call except MYSQL
     private static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
@@ -88,7 +76,7 @@ public class OdbcScanNode extends ScanNode {
         super(id, desc, "SCAN ODBC");
         connectString = tbl.getConnectString();
         odbcType = tbl.getOdbcTableType();
-        tblName = databaseProperName(odbcType, tbl.getOdbcTableName());
+        tblName = OdbcTable.databaseProperName(odbcType, tbl.getOdbcTableName());
     }
 
     @Override
@@ -156,7 +144,7 @@ public class OdbcScanNode extends ScanNode {
                 continue;
             }
             Column col = slot.getColumn();
-            columns.add(databaseProperName(odbcType, col.getName()));
+            columns.add(OdbcTable.databaseProperName(odbcType, col.getName()));
         }
         // this happens when count(*)
         if (0 == columns.size()) {
@@ -176,7 +164,7 @@ public class OdbcScanNode extends ScanNode {
         for (SlotRef slotRef : slotRefs) {
             SlotRef tmpRef = (SlotRef) slotRef.clone();
             tmpRef.setTblName(null);
-            tmpRef.setLabel(databaseProperName(odbcType, tmpRef.getColumnName()));
+            tmpRef.setLabel(OdbcTable.databaseProperName(odbcType, tmpRef.getColumnName()));
             sMap.put(slotRef, tmpRef);
         }
         ArrayList<Expr> odbcConjuncts = Expr.cloneList(conjuncts, sMap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcTableSink.java
new file mode 100644
index 0000000..f75e8f9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OdbcTableSink.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.OdbcTable;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TDataSinkType;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TOdbcTableSink;
+import org.apache.doris.thrift.TOdbcTableType;
+
+public class OdbcTableSink extends DataSink {
+    private final TOdbcTableType odbcType ;
+    private final String tblName;
+    private final String originTblName;
+    private final String connectString;
+    private final boolean useTransaction;
+
+    public OdbcTableSink(OdbcTable odbcTable) {
+        connectString = odbcTable.getConnectString();
+        originTblName = odbcTable.getName();
+        odbcType = odbcTable.getOdbcTableType();
+        tblName = odbcTable.databaseProperName(odbcType, odbcTable.getOdbcTableName());
+        useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
+    }
+
+    @Override
+    public String getExplainString(String prefix, TExplainLevel explainLevel) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append(prefix + "ODBC TABLE SINK:\n");
+        strBuilder.append(prefix + "TABLENAME IN DORIS: ").append(originTblName).append("\n");
+        strBuilder.append(prefix + "TABLE TYPE: ").append(odbcType.toString()).append("\n");
+        strBuilder.append(prefix + "TABLENAME OF EXTERNAL TABLE: ").append(tblName).append("\n");
+        strBuilder.append(prefix + "EnableTransaction: ").append(useTransaction ? "true" : "false").append("\n");
+        return strBuilder.toString();
+    }
+
+    @Override
+    protected TDataSink toThrift() {
+        TDataSink tDataSink = new TDataSink(TDataSinkType.ODBC_TABLE_SINK);
+        TOdbcTableSink odbcTableSink = new TOdbcTableSink();
+        odbcTableSink.setConnectString(connectString);
+        odbcTableSink.setTable(tblName);
+        odbcTableSink.setUseTransaction(useTransaction);
+        tDataSink.setOdbcTableSink(odbcTableSink);
+        return tDataSink;
+    }
+
+    @Override
+    public PlanNodeId getExchNodeId() {
+        return null;
+    }
+
+    @Override
+    public DataPartition getOutputPartition() {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8e4cb75..1a96c13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -76,7 +76,8 @@ public class SessionVariable implements Serializable, Writable {
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
     public static final String ENABLE_SPILLING = "enable_spilling";
     public static final String PREFER_JOIN_METHOD = "prefer_join_method";
-    
+
+    public static final String ENABLE_ODBC_TRANSCATION = "enable_odbc_transcation";
     public static final String ENABLE_SQL_CACHE = "enable_sql_cache";
     public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache";
 
@@ -235,6 +236,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
     private boolean enableInsertStrict = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
+    private boolean enableOdbcTransaction = false;
+
     @VariableMgr.VarAttr(name = ENABLE_SQL_CACHE)
     private boolean enableSqlCache = false;
 
@@ -428,6 +432,10 @@ public class SessionVariable implements Serializable, Writable {
         return enableBucketShuffleJoin;
     }
 
+    public boolean isEnableOdbcTransaction() {
+        return enableOdbcTransaction;
+    }
+
     public String getPreferJoinMethod() {return preferJoinMethod; }
 
     public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 3dacff5..92e7664 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1255,6 +1255,25 @@ public class QueryPlanTest {
     }
 
     @Test
+    public void testOdbcSink() throws Exception {
+        connectContext.setDatabase("default_cluster:test");
+
+        // insert into odbc_oracle table
+        String queryStr = "explain insert into odbc_oracle select * from odbc_mysql";
+        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("TABLENAME IN DORIS: odbc_oracle"));
+        Assert.assertTrue(explainString.contains("TABLE TYPE: ORACLE"));
+        Assert.assertTrue(explainString.contains("TABLENAME OF EXTERNAL TABLE: tbl1"));
+
+        // enable transaction of ODBC Sink
+        Deencapsulation.setField(connectContext.getSessionVariable(), "enableOdbcTransaction", true);
+        queryStr = "explain insert into odbc_oracle select * from odbc_mysql";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+        Assert.assertTrue(explainString.contains("EnableTransaction: true"));
+    }
+
+
+    @Test
     public void testPreferBroadcastJoin() throws Exception {
         connectContext.setDatabase("default_cluster:test");
         String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1";
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 343cbbb..ebf4371 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -31,7 +31,8 @@ enum TDataSinkType {
     MYSQL_TABLE_SINK,
     EXPORT_SINK,
     OLAP_TABLE_SINK,
-    MEMORY_SCRATCH_SINK
+    MEMORY_SCRATCH_SINK,
+    ODBC_TABLE_SINK
 }
 
 enum TResultSinkType {
@@ -82,6 +83,12 @@ struct TMysqlTableSink {
     6: required string table
 }
 
+struct TOdbcTableSink {
+    1: optional string connect_string
+    2: optional string table
+    3: optional bool use_transaction
+}
+
 // Following is used to split data read from 
 // Used to describe rollup schema
 struct TRollupSchema {
@@ -133,5 +140,6 @@ struct TDataSink {
   6: optional TExportSink export_sink
   7: optional TOlapTableSink olap_table_sink
   8: optional TMemoryScratchSink memory_scratch_sink
+  9: optional TOdbcTableSink odbc_table_sink
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org