You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/10/26 16:02:32 UTC

[doris] branch master updated: [Bug](jdbc) Fix memory leak for JDBC datasource (#13657)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c95106d45 [Bug](jdbc) Fix memory leak for JDBC datasource (#13657)
3c95106d45 is described below

commit 3c95106d45163fd0c9c7df769b8d711b9c2f7cdf
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 27 00:02:25 2022 +0800

    [Bug](jdbc) Fix memory leak for JDBC datasource (#13657)
---
 be/src/common/config.h                             |   2 +
 be/src/exec/odbc_connector.cpp                     |   2 +-
 be/src/exec/odbc_connector.h                       |   2 +-
 be/src/exec/odbc_scan_node.cpp                     |   2 +-
 be/src/exec/table_connector.h                      |   4 +-
 be/src/runtime/odbc_table_sink.cpp                 |   2 +-
 be/src/util/jni-util.cpp                           |  14 +-
 be/src/vec/exec/scan/new_jdbc_scanner.cpp          |   3 +-
 be/src/vec/exec/scan/new_odbc_scanner.cpp          |   2 +-
 be/src/vec/exec/vjdbc_connector.cpp                | 151 +++++++++++----------
 be/src/vec/exec/vjdbc_connector.h                  |   8 +-
 be/src/vec/exec/vjdbc_scan_node.cpp                |   2 +-
 be/src/vec/exec/vodbc_scan_node.cpp                |   2 +-
 be/src/vec/sink/vjdbc_table_sink.cpp               |   3 +-
 be/src/vec/sink/vodbc_table_sink.cpp               |   2 +-
 dist/LICENSE-dist.txt                              |   2 +-
 fe/java-udf/pom.xml                                |   5 +
 .../java/org/apache/doris/udf/JdbcExecutor.java    |  70 ++++++----
 fe/pom.xml                                         |   1 +
 gensrc/thrift/Types.thrift                         |  12 +-
 20 files changed, 175 insertions(+), 116 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index b30cc1b005..8a690c2c1d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -865,6 +865,8 @@ CONF_String(be_node_role, "mix");
 // Hide the be config page for webserver.
 CONF_Bool(hide_webserver_config_page, "false");
 
+CONF_String(jvm_max_heap_size, "1024M");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp
index 0ffedf4f9a..826d8b8bb9 100644
--- a/be/src/exec/odbc_connector.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -70,7 +70,7 @@ ODBCConnector::~ODBCConnector() {
     }
 }
 
-Status ODBCConnector::open() {
+Status ODBCConnector::open(RuntimeState* state, bool read) {
     if (_is_open) {
         LOG(INFO) << "this scanner already opened";
         return Status::OK();
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index 41c7280741..2aabcb61b6 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -54,7 +54,7 @@ public:
     explicit ODBCConnector(const ODBCConnectorParam& param);
     ~ODBCConnector() override;
 
-    Status open() override;
+    Status open(RuntimeState* state, bool read = false) override;
     // query for ODBC table
     Status query() override;
 
diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp
index c0cfd53bb4..84194c7510 100644
--- a/be/src/exec/odbc_scan_node.cpp
+++ b/be/src/exec/odbc_scan_node.cpp
@@ -106,7 +106,7 @@ Status OdbcScanNode::open(RuntimeState* state) {
     }
 
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(_odbc_scanner->open());
+    RETURN_IF_ERROR(_odbc_scanner->open(state));
     RETURN_IF_ERROR(_odbc_scanner->query());
     // check materialize slot num
 
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 06bdadd380..3fa9f5f5b1 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -38,7 +38,7 @@ public:
     TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str);
     virtual ~TableConnector() = default;
 
-    virtual Status open() = 0;
+    virtual Status open(RuntimeState* state, bool read = false) = 0;
     // exec query for table
     virtual Status query() = 0;
 
@@ -64,6 +64,8 @@ public:
 
     std::u16string utf8_to_u16string(const char* first, const char* last);
 
+    virtual Status close() { return Status::OK(); }
+
 protected:
     bool _is_open;
     bool _is_in_transaction;
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp
index 3191431138..a7c58d22d4 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -65,7 +65,7 @@ Status OdbcTableSink::open(RuntimeState* state) {
     RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
     // create writer
     _writer.reset(new ODBCConnector(_odbc_param));
-    RETURN_IF_ERROR(_writer->open());
+    RETURN_IF_ERROR(_writer->open(state));
     if (_use_transaction) {
         RETURN_IF_ERROR(_writer->begin_trans());
     }
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 384af3e414..e9a0673dc8 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -22,6 +22,7 @@
 #include <jni_md.h>
 #include <stdlib.h>
 
+#include "common/config.h"
 #include "gutil/once.h"
 #include "gutil/strings/substitute.h"
 
@@ -39,13 +40,16 @@ void FindOrCreateJavaVM() {
     if (rv == 0) {
         JNIEnv* env;
         JavaVMInitArgs vm_args;
-        JavaVMOption options[1];
-        char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
-        options[0].optionString = str;
+        JavaVMOption options[2];
+        char* cp = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
+        options[0].optionString = cp;
+        std::string heap_size = fmt::format("-Xmx{}", config::jvm_max_heap_size);
+        options[1].optionString = const_cast<char*>(heap_size.c_str());
         vm_args.version = JNI_VERSION_1_8;
         vm_args.options = options;
-        vm_args.nOptions = 1;
-        vm_args.ignoreUnrecognized = JNI_TRUE;
+        vm_args.nOptions = 2;
+        // Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max size config.
+        vm_args.ignoreUnrecognized = JNI_FALSE;
 
         jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args);
         if (JNI_OK != res) {
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 61c9ff53c5..2141d6b7e8 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -81,7 +81,7 @@ Status NewJdbcScanner::open(RuntimeState* state) {
     }
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(VScanner::open(state));
-    RETURN_IF_ERROR(_jdbc_connector->open());
+    RETURN_IF_ERROR(_jdbc_connector->open(state, true));
     RETURN_IF_ERROR(_jdbc_connector->query());
     return Status::OK();
 }
@@ -147,6 +147,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
 
 Status NewJdbcScanner::close(RuntimeState* state) {
     RETURN_IF_ERROR(VScanner::close(state));
+    RETURN_IF_ERROR(_jdbc_connector->close());
     return Status::OK();
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index a69ecb1f32..59c94cf887 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -88,7 +88,7 @@ Status NewOdbcScanner::open(RuntimeState* state) {
 
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(VScanner::open(state));
-    RETURN_IF_ERROR(_odbc_connector->open());
+    RETURN_IF_ERROR(_odbc_connector->open(state));
     RETURN_IF_ERROR(_odbc_connector->query());
     // check materialize slot num
 
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 2551fd88c2..5cff86bf46 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exec/vjdbc_connector.h"
 #ifdef LIBJVM
+#include "common/status.h"
 #include "exec/table_connector.h"
 #include "gen_cpp/Types_types.h"
 #include "gutil/strings/substitute.h"
@@ -30,7 +31,7 @@ namespace doris {
 namespace vectorized {
 const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
 const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
-const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I";
+const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
 const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
 const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
 const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
@@ -39,24 +40,50 @@ const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
 const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
 
 JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
-        : TableConnector(param.tuple_desc, param.query_string), _conn_param(param) {}
+        : TableConnector(param.tuple_desc, param.query_string),
+          _conn_param(param),
+          _closed(false) {}
 
 JdbcConnector::~JdbcConnector() {
+    if (!_closed) {
+        close();
+    }
+}
+
+#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \
+    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz));
+
+#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);
+
+Status JdbcConnector::close() {
+    _closed = true;
     if (!_is_open) {
-        return;
+        return Status::OK();
     }
     if (_is_in_transaction) {
-        abort_trans();
+        RETURN_IF_ERROR(abort_trans());
     }
     JNIEnv* env;
-    Status status;
-    RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    env->DeleteGlobalRef(_executor_clazz);
+    DELETE_BASIC_JAVA_CLAZZ_REF(object)
+    DELETE_BASIC_JAVA_CLAZZ_REF(uint8_t)
+    DELETE_BASIC_JAVA_CLAZZ_REF(int8_t)
+    DELETE_BASIC_JAVA_CLAZZ_REF(int16_t)
+    DELETE_BASIC_JAVA_CLAZZ_REF(int32_t)
+    DELETE_BASIC_JAVA_CLAZZ_REF(int64_t)
+    DELETE_BASIC_JAVA_CLAZZ_REF(float)
+    DELETE_BASIC_JAVA_CLAZZ_REF(double)
+    DELETE_BASIC_JAVA_CLAZZ_REF(string)
+    DELETE_BASIC_JAVA_CLAZZ_REF(list)
+#undef DELETE_BASIC_JAVA_CLAZZ_REF
     env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id);
-    RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+    RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
     env->DeleteGlobalRef(_executor_obj);
+    return Status::OK();
 }
 
-Status JdbcConnector::open() {
+Status JdbcConnector::open(RuntimeState* state, bool read) {
     if (_is_open) {
         LOG(INFO) << "this scanner of jdbc already opened";
         return Status::OK();
@@ -65,16 +92,19 @@ Status JdbcConnector::open() {
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz));
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz));
+
+    GET_BASIC_JAVA_CLAZZ("java/util/List", list)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Boolean", uint8_t)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Byte", int8_t)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Short", int16_t)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Integer", int32_t)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Long", int64_t)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Float", float)
+    GET_BASIC_JAVA_CLAZZ("java/lang/Float", double)
+    GET_BASIC_JAVA_CLAZZ("java/lang/String", string)
+
+#undef GET_BASIC_JAVA_CLAZZ
     RETURN_IF_ERROR(_register_func_id(env));
 
     // Add a scoped cleanup jni reference object. This cleans up local refs made below.
@@ -87,17 +117,23 @@ Status JdbcConnector::open() {
                 std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
                 _conn_param.driver_checksum, &local_location));
         TJdbcExecutorCtorParams ctor_params;
-        ctor_params.__set_jar_location_path(local_location);
+        ctor_params.__set_statement(_sql_str);
         ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
         ctor_params.__set_jdbc_user(_conn_param.user);
         ctor_params.__set_jdbc_password(_conn_param.passwd);
         ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
+        ctor_params.__set_batch_size(read ? state->batch_size() : 0);
+        ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
 
         jbyteArray ctor_params_bytes;
         // Pushed frame will be popped when jni_frame goes out-of-scope.
         RETURN_IF_ERROR(jni_frame.push(env));
         RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
         _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
+
+        jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
+        env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
+        env->DeleteLocalRef(ctor_params_bytes);
     }
     RETURN_ERROR_IF_EXC(env);
     RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj));
@@ -119,10 +155,8 @@ Status JdbcConnector::query() {
 
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-    jstring query_sql = env->NewStringUTF(_sql_str.c_str());
-    jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
-                                                     _executor_query_id, query_sql);
-    env->DeleteLocalRef(query_sql);
+    jint colunm_count =
+            env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
     RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
 
     if (colunm_count != materialize_num) {
@@ -164,10 +198,14 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
         for (int row = 0; row < num_rows; ++row) {
             jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
             _convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
+            env->DeleteLocalRef(cur_data);
         }
+        env->DeleteLocalRef(column_data);
 
         materialized_column_index++;
     }
+    // All Java objects returned by JNI functions are local references.
+    env->DeleteLocalRef(block_obj);
     return JniUtil::GetJniExceptionMsg(env);
 }
 
@@ -186,8 +224,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
 
     RETURN_IF_ERROR(register_id(_executor_clazz, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
                                 _executor_ctor_id));
-    RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE,
-                                _executor_query_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE,
+                                _executor_write_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
                                 _executor_close_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
@@ -232,48 +271,20 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
     }
 
     switch (slot_desc->type().type) {
-    case TYPE_BOOLEAN: {
-        uint8_t num = _jobject_to_uint8_t(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
-                (uint8_t)num);
-        break;
-    }
-    case TYPE_TINYINT: {
-        int8_t num = _jobject_to_int8_t(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
-        break;
+#define M(TYPE, CPP_TYPE, COLUMN_TYPE)                              \
+    case TYPE: {                                                    \
+        CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj);           \
+        reinterpret_cast<COLUMN_TYPE*>(col_ptr)->insert_value(num); \
+        break;                                                      \
     }
-    case TYPE_SMALLINT: {
-        int16_t num = _jobject_to_int16_t(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
-        break;
-    }
-
-    case TYPE_INT: {
-        int32_t num = _jobject_to_int32_t(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
-        break;
-    }
-
-    case TYPE_BIGINT: {
-        int64_t num = _jobject_to_int64_t(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
-        break;
-    }
-
-    case TYPE_FLOAT: {
-        float num = _jobject_to_float(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
-                num);
-        break;
-    }
-    case TYPE_DOUBLE: {
-        double num = _jobject_to_double(env, jobj);
-        reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
-                num);
-        break;
-    }
-
+        M(TYPE_BOOLEAN, uint8_t, vectorized::ColumnVector<vectorized::UInt8>)
+        M(TYPE_TINYINT, int8_t, vectorized::ColumnVector<vectorized::Int8>)
+        M(TYPE_SMALLINT, int16_t, vectorized::ColumnVector<vectorized::Int16>)
+        M(TYPE_INT, int32_t, vectorized::ColumnVector<vectorized::Int32>)
+        M(TYPE_BIGINT, int64_t, vectorized::ColumnVector<vectorized::Int64>)
+        M(TYPE_FLOAT, float, vectorized::ColumnVector<vectorized::Float32>)
+        M(TYPE_DOUBLE, double, vectorized::ColumnVector<vectorized::Float64>)
+#undef M
     case TYPE_STRING:
     case TYPE_CHAR:
     case TYPE_VARCHAR: {
@@ -317,7 +328,7 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(), insert_stmt.size());
-    env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql);
+    env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_write_id, query_sql);
     env->DeleteLocalRef(query_sql);
     RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
     return Status::OK();
@@ -325,13 +336,15 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
 
 std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
     jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
-    const jbyteArray stringJbytes =
-            (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8"));
+    auto coding = env->NewStringUTF("UTF-8");
+    const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding);
     size_t length = (size_t)env->GetArrayLength(stringJbytes);
     jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr);
     std::string str = std::string((char*)pBytes, length);
     env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT);
     env->DeleteLocalRef(stringJbytes);
+    env->DeleteLocalRef(jstr);
+    env->DeleteLocalRef(coding);
     return str;
 }
 
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index 04aa8982da..e6f957746f 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -42,7 +42,7 @@ public:
 
     ~JdbcConnector() override;
 
-    Status open() override;
+    Status open(RuntimeState* state, bool read = false) override;
 
     Status query() override;
 
@@ -56,6 +56,8 @@ public:
     Status abort_trans() override; // should be call after transaction abort
     Status finish_trans() override; // should be call after transaction commit
 
+    Status close() override;
+
 private:
     Status _register_func_id(JNIEnv* env);
     Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
@@ -65,13 +67,15 @@ private:
     int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj);
 
     const JdbcConnectorParam& _conn_param;
+    bool _closed;
     jclass _executor_clazz;
     jclass _executor_list_clazz;
     jclass _executor_object_clazz;
     jclass _executor_string_clazz;
     jobject _executor_obj;
     jmethodID _executor_ctor_id;
-    jmethodID _executor_query_id;
+    jmethodID _executor_write_id;
+    jmethodID _executor_read_id;
     jmethodID _executor_has_next_id;
     jmethodID _executor_get_blocks_id;
     jmethodID _executor_close_id;
diff --git a/be/src/vec/exec/vjdbc_scan_node.cpp b/be/src/vec/exec/vjdbc_scan_node.cpp
index e857218e97..7d15603992 100644
--- a/be/src/vec/exec/vjdbc_scan_node.cpp
+++ b/be/src/vec/exec/vjdbc_scan_node.cpp
@@ -91,7 +91,7 @@ Status VJdbcScanNode::open(RuntimeState* state) {
     }
 
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(_jdbc_connector->open());
+    RETURN_IF_ERROR(_jdbc_connector->open(state, true));
     RETURN_IF_ERROR(_jdbc_connector->query());
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vodbc_scan_node.cpp b/be/src/vec/exec/vodbc_scan_node.cpp
index 7bdc44501f..994eedc926 100644
--- a/be/src/vec/exec/vodbc_scan_node.cpp
+++ b/be/src/vec/exec/vodbc_scan_node.cpp
@@ -100,7 +100,7 @@ Status VOdbcScanNode::open(RuntimeState* state) {
     }
 
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(_odbc_scanner->open());
+    RETURN_IF_ERROR(_odbc_scanner->open(state));
     RETURN_IF_ERROR(_odbc_scanner->query());
     // check materialize slot num
 
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp
index b4ad942a3d..4c428aa47b 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -58,7 +58,7 @@ Status VJdbcTableSink::open(RuntimeState* state) {
 
     // create writer
     _writer.reset(new JdbcConnector(_jdbc_param));
-    RETURN_IF_ERROR(_writer->open());
+    RETURN_IF_ERROR(_writer->open(state, false));
     if (_use_transaction) {
         RETURN_IF_ERROR(_writer->begin_trans());
     }
@@ -96,6 +96,7 @@ Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) {
     if (exec_status.ok() && _use_transaction) {
         RETURN_IF_ERROR(_writer->finish_trans());
     }
+    RETURN_IF_ERROR(_writer->close());
     return DataSink::close(state, exec_status);
 }
 } // namespace vectorized
diff --git a/be/src/vec/sink/vodbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp
index 695c06ac68..8d6a0596b2 100644
--- a/be/src/vec/sink/vodbc_table_sink.cpp
+++ b/be/src/vec/sink/vodbc_table_sink.cpp
@@ -48,7 +48,7 @@ Status VOdbcTableSink::open(RuntimeState* state) {
 
     // create writer
     _writer.reset(new ODBCConnector(_odbc_param));
-    RETURN_IF_ERROR(_writer->open());
+    RETURN_IF_ERROR(_writer->open(state));
     if (_use_transaction) {
         RETURN_IF_ERROR(_writer->begin_trans());
     }
diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt
index 4403b5a519..02888b004b 100644
--- a/dist/LICENSE-dist.txt
+++ b/dist/LICENSE-dist.txt
@@ -882,7 +882,7 @@ The Apache Software License, Version 2.0
     * Hibernate Validator Engine:
         - org.hibernate:hibernate-validator:5.1.0.Final (http://validator.hibernate.org/hibernate-validator)
     * HikariCP:
-        - com.zaxxer:HikariCP:4.0.3 (https://github.com/brettwooldridge/HikariCP)
+        - com.zaxxer:HikariCP:3.4.5 (https://github.com/brettwooldridge/HikariCP)
     * Hive Common:
         - org.apache.hive:hive-common:2.3.7 (https://hive.apache.org/hive-common)
     * Hive Llap Client:
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index dbcebd9ced..3b0595442d 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -70,6 +70,11 @@ under the License.
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+            <version>${hikaricp.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 4936bc5be3..49da4332c7 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -19,17 +19,20 @@ package org.apache.doris.udf;
 
 
 import org.apache.doris.thrift.TJdbcExecutorCtorParams;
+import org.apache.doris.thrift.TJdbcOperation;
 
+import com.google.common.base.Preconditions;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import java.net.MalformedURLException;
-import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -43,11 +46,12 @@ import java.util.List;
 public class JdbcExecutor {
     private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
     private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
-    private URLClassLoader classLoader = null;
     private Connection conn = null;
     private Statement stmt = null;
     private ResultSet resultSet = null;
     private ResultSetMetaData resultSetMetaData = null;
+    // Use HikariDataSource to help us manage the JDBC connections.
+    private HikariDataSource dataSource = null;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -57,8 +61,8 @@ public class JdbcExecutor {
         } catch (TException e) {
             throw new InternalException(e.getMessage());
         }
-        init(request.jar_location_path, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user,
-                request.jdbc_password);
+        init(request.statement, request.batch_size, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user,
+                request.jdbc_password, request.op);
     }
 
     public void close() throws Exception {
@@ -71,12 +75,26 @@ public class JdbcExecutor {
         if (conn != null) {
             conn.close();
         }
-        if (classLoader != null) {
-            classLoader.close();
+        if (dataSource != null) {
+            dataSource.close();
         }
+        resultSet = null;
+        stmt = null;
+        conn = null;
+        dataSource = null;
     }
 
-    public int querySQL(String sql) throws UdfRuntimeException {
+    public int read() throws UdfRuntimeException {
+        try {
+            resultSet = ((PreparedStatement) stmt).executeQuery();
+            resultSetMetaData = resultSet.getMetaData();
+            return resultSetMetaData.getColumnCount();
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
+    }
+
+    public int write(String sql) throws UdfRuntimeException {
         try {
             boolean res = stmt.execute(sql);
             if (res) { // sql query
@@ -175,28 +193,28 @@ public class JdbcExecutor {
         return time;
     }
 
-    private void init(String driverPath, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword)
-            throws UdfRuntimeException {
+    private void init(String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser,
+            String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException {
         try {
-            ClassLoader loader;
-            if (driverPath != null) {
-                ClassLoader parent = getClass().getClassLoader();
-                classLoader = UdfUtils.getClassLoader(driverPath, parent);
-                loader = classLoader;
+            HikariConfig config = new HikariConfig();
+            config.setDriverClassName(driverClass);
+            config.setJdbcUrl(jdbcUrl);
+            config.setUsername(jdbcUser);
+            config.setPassword(jdbcPassword);
+            config.setMaximumPoolSize(1);
+
+            dataSource = new HikariDataSource(config);
+            conn = dataSource.getConnection();
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
+            if (op == TJdbcOperation.READ) {
+                Preconditions.checkArgument(sql != null);
+                stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+                stmt.setFetchSize(batchSize);
             } else {
-                loader = ClassLoader.getSystemClassLoader();
+                stmt = conn.createStatement();
             }
-            Class.forName(driverClass, true, loader);
-            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
-            stmt = conn.createStatement();
-        } catch (MalformedURLException e) {
-            throw new UdfRuntimeException("MalformedURLException to load class about " + driverPath, e);
-        } catch (ClassNotFoundException e) {
-            throw new UdfRuntimeException("Loading JDBC class error ClassNotFoundException about " + driverClass, e);
         } catch (SQLException e) {
-            throw new UdfRuntimeException("Connection JDBC class error about " + jdbcUrl, e);
-        } catch (Exception e) {
-            throw new UdfRuntimeException("unable to init jdbc executor Exception ", e);
+            throw new UdfRuntimeException("Initialize datasource failed: ", e);
         }
     }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 4c228dd007..9f9888b680 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -188,6 +188,7 @@ under the License.
         <commons-io.version>2.6</commons-io.version>
         <json-simple.version>1.1.1</json-simple.version>
         <junit.version>5.8.2</junit.version>
+        <hikaricp.version>3.4.5</hikaricp.version>
         <thrift.version>0.13.0</thrift.version>
         <log4j2.version>2.18.0</log4j2.version>
         <metrics-core.version>4.0.2</metrics-core.version>
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index d4a4851d9b..5104045f23 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -357,9 +357,13 @@ struct TFunction {
   13: optional bool vectorized = false
 }
 
+enum TJdbcOperation {
+    READ,
+    WRITE
+}
+
 struct TJdbcExecutorCtorParams {
-  // Local path to the UDF's jar file
-  1: optional string jar_location_path
+  1: optional string statement
 
   // "jdbc:mysql://127.0.0.1:3307/test";
   2: optional string jdbc_url
@@ -372,6 +376,10 @@ struct TJdbcExecutorCtorParams {
 
   //"com.mysql.jdbc.Driver"
   5: optional string jdbc_driver_class
+
+  6: optional i32 batch_size
+
+  7: optional TJdbcOperation op
 }
 
 struct TJavaUdfExecutorCtorParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org