You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/12/07 04:01:28 UTC

[GitHub] [incubator-doris] HappenLee opened a new pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

HappenLee opened a new pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033


   issue:#5031
   
   1. Refactoring code of ODBC scanner to ODBC connecter to ODBC Sink for insert into data to ODBC external table.
   2. Support Transaction for ODBC sink to make sure insert into data is atomicital.
   3. The document about ODBC sink has been modified
   
   ## Proposed changes
   
   Describe the big picture of your changes here to communicate to the maintainers why we should accept this pull request. If it fixes a bug or resolves a feature request, be sure to link to that issue.
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [x] Documentation Update (if none of the other choices apply)
   - [x] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have create an issue on (Fix #ISSUE), and have described the bug/feature there in detail
   - [x] Compiling and unit tests pass locally with my changes
   - [x] I have added tests that prove my fix is effective or that my feature works
   - [x] If this change need a document change, I have updated the document
   - [x] Any dependent changes have been merged
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537253665



##########
File path: be/src/exec/odbc_connecter.cpp
##########
@@ -172,7 +180,175 @@ Status ODBCScanner::get_next_row(bool* eos) {
     return Status::OK();
 }
 
-Status ODBCScanner::error_status(const std::string& prefix, const std::string& error_msg) {
+Status ODBCConnecter::init_to_write() {
+    if (!_is_open) {
+        return Status::InternalError( "Init before open.");
+    }
+
+    // Allocate a statement handle
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::append(const std::string& table_name, RowBatch *batch) {
+    if (batch == nullptr || batch->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    int num_rows = batch->num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(insert_row(table_name, batch->get_row(i)));
+    }
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::insert_row(const std::string& table_name, TupleRow *row) {
+    std::stringstream ss;
+
+    // Construct Insert statement of mysql
+    ss << "INSERT INTO " << table_name << " VALUES (";

Review comment:
       if use mysql statement,  table name better to surrounded by \`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537256214



##########
File path: gensrc/thrift/DataSinks.thrift
##########
@@ -82,6 +83,12 @@ struct TMysqlTableSink {
     6: required string table
 }
 
+struct TOdbcTableSink {
+    1: optional string connect_string
+    2: optional string table
+    3: optional bool is_transaction

Review comment:
       maybe `use_transaction` is better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
HappenLee commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537294675



##########
File path: be/src/exec/odbc_connecter.cpp
##########
@@ -172,7 +180,175 @@ Status ODBCScanner::get_next_row(bool* eos) {
     return Status::OK();
 }
 
-Status ODBCScanner::error_status(const std::string& prefix, const std::string& error_msg) {
+Status ODBCConnecter::init_to_write() {
+    if (!_is_open) {
+        return Status::InternalError( "Init before open.");
+    }
+
+    // Allocate a statement handle
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::append(const std::string& table_name, RowBatch *batch) {
+    if (batch == nullptr || batch->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    int num_rows = batch->num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(insert_row(table_name, batch->get_row(i)));
+    }
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::insert_row(const std::string& table_name, TupleRow *row) {
+    std::stringstream ss;
+
+    // Construct Insert statement of mysql
+    ss << "INSERT INTO " << table_name << " VALUES (";

Review comment:
       I add the surround in FE




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r538948912



##########
File path: be/src/exec/odbc_connector.cpp
##########
@@ -48,18 +50,24 @@ static std::u16string utf8_to_wstring(const std::string& str) {
 
 namespace doris {
 
-ODBCScanner::ODBCScanner(const ODBCScannerParam& param)
+ODBCConnector::ODBCConnector(const ODBCConneterParam& param)

Review comment:
       ```suggestion
   ODBCConnector::ODBCConnector(const ODBCConnetorParam& param)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537255093



##########
File path: be/src/exec/odbc_connecter.h
##########
@@ -52,32 +58,46 @@ struct DataBinding : public boost::noncopyable {
     ~DataBinding() { free(target_value_ptr); }
 };
 
-// ODBC Scanner for scan data from ODBC
-class ODBCScanner {
+// ODBC Connecter for scan data from ODBC
+class ODBCConnecter {

Review comment:
       ```suggestion
   class ODBCConnector {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537254626



##########
File path: be/src/exec/odbc_connecter.cpp
##########
@@ -172,7 +180,175 @@ Status ODBCScanner::get_next_row(bool* eos) {
     return Status::OK();
 }
 
-Status ODBCScanner::error_status(const std::string& prefix, const std::string& error_msg) {
+Status ODBCConnecter::init_to_write() {
+    if (!_is_open) {
+        return Status::InternalError( "Init before open.");
+    }
+
+    // Allocate a statement handle
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::append(const std::string& table_name, RowBatch *batch) {
+    if (batch == nullptr || batch->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    int num_rows = batch->num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(insert_row(table_name, batch->get_row(i)));
+    }
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::insert_row(const std::string& table_name, TupleRow *row) {
+    std::stringstream ss;
+
+    // Construct Insert statement of mysql
+    ss << "INSERT INTO " << table_name << " VALUES (";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        if (i != 0) {
+            ss << ", ";
+        }
+        void* item = _output_expr_ctxs[i]->get_value(row);
+        if (item == nullptr) {
+            ss << "NULL";
+            continue;
+        }
+        switch (_output_expr_ctxs[i]->root()->type().type) {
+            case TYPE_BOOLEAN:
+            case TYPE_TINYINT:
+                ss << (int)*static_cast<int8_t*>(item);
+                break;
+            case TYPE_SMALLINT:
+                ss << *static_cast<int16_t*>(item);
+                break;
+            case TYPE_INT:
+                ss << *static_cast<int32_t*>(item);
+                break;
+            case TYPE_BIGINT:
+                ss << *static_cast<int64_t*>(item);
+                break;
+            case TYPE_FLOAT:
+                ss << *static_cast<float*>(item);
+                break;
+            case TYPE_DOUBLE:
+                ss << *static_cast<double*>(item);
+                break;
+            case TYPE_DATE:
+            case TYPE_DATETIME: {
+                char buf[64];
+                const DateTimeValue* time_val = (const DateTimeValue*)(item);
+                time_val->to_string(buf);
+                ss << "\'" << buf << "\'";
+                break;
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR: {
+                const StringValue* string_val = (const StringValue*)(item);
+
+                if (string_val->ptr == NULL) {
+                    if (string_val->len == 0) {
+                        ss << "\'\'";
+                    } else {
+                        ss << "NULL";
+                    }
+                } else {
+                    ss << "\'";
+                    for (int j = 0; j < string_val->len ; ++j) {
+                        ss << string_val->ptr[j];
+                    }
+                    ss << "\'";
+                }
+                break;
+            }
+            case TYPE_DECIMAL: {
+                const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item);
+                std::string decimal_str;
+                int output_scale = _output_expr_ctxs[i]->root()->output_scale();
+
+                if (output_scale > 0 && output_scale <= 30) {
+                    decimal_str = decimal_val->to_string(output_scale);
+                } else {
+                    decimal_str = decimal_val->to_string();
+                }
+                ss << decimal_str;
+                break;
+            }
+            case TYPE_DECIMALV2: {
+                const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value);
+                std::string decimal_str;
+                int output_scale = _output_expr_ctxs[i]->root()->output_scale();
+
+                if (output_scale > 0 && output_scale <= 30) {
+                    decimal_str = decimal_val.to_string(output_scale);
+                } else {
+                    decimal_str = decimal_val.to_string();
+                }
+                ss << decimal_str;
+                break;
+            }
+
+            default: {
+                std::stringstream err_ss;
+                err_ss << "can't convert this type to mysql type. type = " <<
+                       _output_expr_ctxs[i]->root()->type();
+                return Status::InternalError(err_ss.str());
+            }
+        }
+    }
+    ss << ")";
+
+    // Translate utf8 string to utf16 to use unicode codeing

Review comment:
       ```suggestion
       // Translate utf8 string to utf16 to use unicode encoding
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #5033: [ODBC] Support ODBC Sink for insert into data to ODBC external table

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #5033:
URL: https://github.com/apache/incubator-doris/pull/5033#discussion_r537253665



##########
File path: be/src/exec/odbc_connecter.cpp
##########
@@ -172,7 +180,175 @@ Status ODBCScanner::get_next_row(bool* eos) {
     return Status::OK();
 }
 
-Status ODBCScanner::error_status(const std::string& prefix, const std::string& error_msg) {
+Status ODBCConnecter::init_to_write() {
+    if (!_is_open) {
+        return Status::InternalError( "Init before open.");
+    }
+
+    // Allocate a statement handle
+    ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::append(const std::string& table_name, RowBatch *batch) {
+    if (batch == nullptr || batch->num_rows() == 0) {
+        return Status::OK();
+    }
+
+    int num_rows = batch->num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        RETURN_IF_ERROR(insert_row(table_name, batch->get_row(i)));
+    }
+
+    return Status::OK();
+}
+
+Status ODBCConnecter::insert_row(const std::string& table_name, TupleRow *row) {
+    std::stringstream ss;
+
+    // Construct Insert statement of mysql
+    ss << "INSERT INTO " << table_name << " VALUES (";

Review comment:
       if use mysql statement,  table name better to surrounded by ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org