You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/10/26 16:02:32 UTC
[doris] branch master updated: [Bug](jdbc) Fix memory leak for JDBC datasource (#13657)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3c95106d45 [Bug](jdbc) Fix memory leak for JDBC datasource (#13657)
3c95106d45 is described below
commit 3c95106d45163fd0c9c7df769b8d711b9c2f7cdf
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 27 00:02:25 2022 +0800
[Bug](jdbc) Fix memory leak for JDBC datasource (#13657)
---
be/src/common/config.h | 2 +
be/src/exec/odbc_connector.cpp | 2 +-
be/src/exec/odbc_connector.h | 2 +-
be/src/exec/odbc_scan_node.cpp | 2 +-
be/src/exec/table_connector.h | 4 +-
be/src/runtime/odbc_table_sink.cpp | 2 +-
be/src/util/jni-util.cpp | 14 +-
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 3 +-
be/src/vec/exec/scan/new_odbc_scanner.cpp | 2 +-
be/src/vec/exec/vjdbc_connector.cpp | 151 +++++++++++----------
be/src/vec/exec/vjdbc_connector.h | 8 +-
be/src/vec/exec/vjdbc_scan_node.cpp | 2 +-
be/src/vec/exec/vodbc_scan_node.cpp | 2 +-
be/src/vec/sink/vjdbc_table_sink.cpp | 3 +-
be/src/vec/sink/vodbc_table_sink.cpp | 2 +-
dist/LICENSE-dist.txt | 2 +-
fe/java-udf/pom.xml | 5 +
.../java/org/apache/doris/udf/JdbcExecutor.java | 70 ++++++----
fe/pom.xml | 1 +
gensrc/thrift/Types.thrift | 12 +-
20 files changed, 175 insertions(+), 116 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b30cc1b005..8a690c2c1d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -865,6 +865,8 @@ CONF_String(be_node_role, "mix");
// Hide the be config page for webserver.
CONF_Bool(hide_webserver_config_page, "false");
+CONF_String(jvm_max_heap_size, "1024M");
+
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp
index 0ffedf4f9a..826d8b8bb9 100644
--- a/be/src/exec/odbc_connector.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -70,7 +70,7 @@ ODBCConnector::~ODBCConnector() {
}
}
-Status ODBCConnector::open() {
+Status ODBCConnector::open(RuntimeState* state, bool read) {
if (_is_open) {
LOG(INFO) << "this scanner already opened";
return Status::OK();
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index 41c7280741..2aabcb61b6 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -54,7 +54,7 @@ public:
explicit ODBCConnector(const ODBCConnectorParam& param);
~ODBCConnector() override;
- Status open() override;
+ Status open(RuntimeState* state, bool read = false) override;
// query for ODBC table
Status query() override;
diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp
index c0cfd53bb4..84194c7510 100644
--- a/be/src/exec/odbc_scan_node.cpp
+++ b/be/src/exec/odbc_scan_node.cpp
@@ -106,7 +106,7 @@ Status OdbcScanNode::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(_odbc_scanner->open());
+ RETURN_IF_ERROR(_odbc_scanner->open(state));
RETURN_IF_ERROR(_odbc_scanner->query());
// check materialize slot num
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 06bdadd380..3fa9f5f5b1 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -38,7 +38,7 @@ public:
TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str);
virtual ~TableConnector() = default;
- virtual Status open() = 0;
+ virtual Status open(RuntimeState* state, bool read = false) = 0;
// exec query for table
virtual Status query() = 0;
@@ -64,6 +64,8 @@ public:
std::u16string utf8_to_u16string(const char* first, const char* last);
+ virtual Status close() { return Status::OK(); }
+
protected:
bool _is_open;
bool _is_in_transaction;
diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp
index 3191431138..a7c58d22d4 100644
--- a/be/src/runtime/odbc_table_sink.cpp
+++ b/be/src/runtime/odbc_table_sink.cpp
@@ -65,7 +65,7 @@ Status OdbcTableSink::open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
// create writer
_writer.reset(new ODBCConnector(_odbc_param));
- RETURN_IF_ERROR(_writer->open());
+ RETURN_IF_ERROR(_writer->open(state));
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 384af3e414..e9a0673dc8 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -22,6 +22,7 @@
#include <jni_md.h>
#include <stdlib.h>
+#include "common/config.h"
#include "gutil/once.h"
#include "gutil/strings/substitute.h"
@@ -39,13 +40,16 @@ void FindOrCreateJavaVM() {
if (rv == 0) {
JNIEnv* env;
JavaVMInitArgs vm_args;
- JavaVMOption options[1];
- char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
- options[0].optionString = str;
+ JavaVMOption options[2];
+ char* cp = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
+ options[0].optionString = cp;
+ std::string heap_size = fmt::format("-Xmx{}", config::jvm_max_heap_size);
+ options[1].optionString = const_cast<char*>(heap_size.c_str());
vm_args.version = JNI_VERSION_1_8;
vm_args.options = options;
- vm_args.nOptions = 1;
- vm_args.ignoreUnrecognized = JNI_TRUE;
+ vm_args.nOptions = 2;
+ // Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max size config.
+ vm_args.ignoreUnrecognized = JNI_FALSE;
jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args);
if (JNI_OK != res) {
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 61c9ff53c5..2141d6b7e8 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -81,7 +81,7 @@ Status NewJdbcScanner::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
- RETURN_IF_ERROR(_jdbc_connector->open());
+ RETURN_IF_ERROR(_jdbc_connector->open(state, true));
RETURN_IF_ERROR(_jdbc_connector->query());
return Status::OK();
}
@@ -147,6 +147,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
Status NewJdbcScanner::close(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::close(state));
+ RETURN_IF_ERROR(_jdbc_connector->close());
return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index a69ecb1f32..59c94cf887 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -88,7 +88,7 @@ Status NewOdbcScanner::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
- RETURN_IF_ERROR(_odbc_connector->open());
+ RETURN_IF_ERROR(_odbc_connector->open(state));
RETURN_IF_ERROR(_odbc_connector->query());
// check materialize slot num
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 2551fd88c2..5cff86bf46 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -17,6 +17,7 @@
#include "vec/exec/vjdbc_connector.h"
#ifdef LIBJVM
+#include "common/status.h"
#include "exec/table_connector.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
@@ -30,7 +31,7 @@ namespace doris {
namespace vectorized {
const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
-const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I";
+const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
@@ -39,24 +40,50 @@ const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
- : TableConnector(param.tuple_desc, param.query_string), _conn_param(param) {}
+ : TableConnector(param.tuple_desc, param.query_string),
+ _conn_param(param),
+ _closed(false) {}
JdbcConnector::~JdbcConnector() {
+ if (!_closed) {
+ close();
+ }
+}
+
+#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz));
+
+#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);
+
+Status JdbcConnector::close() {
+ _closed = true;
if (!_is_open) {
- return;
+ return Status::OK();
}
if (_is_in_transaction) {
- abort_trans();
+ RETURN_IF_ERROR(abort_trans());
}
JNIEnv* env;
- Status status;
- RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+ RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+ env->DeleteGlobalRef(_executor_clazz);
+ DELETE_BASIC_JAVA_CLAZZ_REF(object)
+ DELETE_BASIC_JAVA_CLAZZ_REF(uint8_t)
+ DELETE_BASIC_JAVA_CLAZZ_REF(int8_t)
+ DELETE_BASIC_JAVA_CLAZZ_REF(int16_t)
+ DELETE_BASIC_JAVA_CLAZZ_REF(int32_t)
+ DELETE_BASIC_JAVA_CLAZZ_REF(int64_t)
+ DELETE_BASIC_JAVA_CLAZZ_REF(float)
+ DELETE_BASIC_JAVA_CLAZZ_REF(double)
+ DELETE_BASIC_JAVA_CLAZZ_REF(string)
+ DELETE_BASIC_JAVA_CLAZZ_REF(list)
+#undef DELETE_BASIC_JAVA_CLAZZ_REF
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id);
- RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+ RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteGlobalRef(_executor_obj);
+ return Status::OK();
}
-Status JdbcConnector::open() {
+Status JdbcConnector::open(RuntimeState* state, bool read) {
if (_is_open) {
LOG(INFO) << "this scanner of jdbc already opened";
return Status::OK();
@@ -65,16 +92,19 @@ Status JdbcConnector::open() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz));
+
+ GET_BASIC_JAVA_CLAZZ("java/util/List", list)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Boolean", uint8_t)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Byte", int8_t)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Short", int16_t)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Integer", int32_t)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Long", int64_t)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Float", float)
+ GET_BASIC_JAVA_CLAZZ("java/lang/Float", double)
+ GET_BASIC_JAVA_CLAZZ("java/lang/String", string)
+
+#undef GET_BASIC_JAVA_CLAZZ
RETURN_IF_ERROR(_register_func_id(env));
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
@@ -87,17 +117,23 @@ Status JdbcConnector::open() {
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
TJdbcExecutorCtorParams ctor_params;
- ctor_params.__set_jar_location_path(local_location);
+ ctor_params.__set_statement(_sql_str);
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
ctor_params.__set_jdbc_user(_conn_param.user);
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
+ ctor_params.__set_batch_size(read ? state->batch_size() : 0);
+ ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
_executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
+
+ jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
+ env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
+ env->DeleteLocalRef(ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj));
@@ -119,10 +155,8 @@ Status JdbcConnector::query() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
- jstring query_sql = env->NewStringUTF(_sql_str.c_str());
- jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
- _executor_query_id, query_sql);
- env->DeleteLocalRef(query_sql);
+ jint colunm_count =
+ env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
if (colunm_count != materialize_num) {
@@ -164,10 +198,14 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
for (int row = 0; row < num_rows; ++row) {
jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
_convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
+ env->DeleteLocalRef(cur_data);
}
+ env->DeleteLocalRef(column_data);
materialized_column_index++;
}
+ // All Java objects returned by JNI functions are local references.
+ env->DeleteLocalRef(block_obj);
return JniUtil::GetJniExceptionMsg(env);
}
@@ -186,8 +224,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
RETURN_IF_ERROR(register_id(_executor_clazz, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
_executor_ctor_id));
- RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE,
- _executor_query_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE,
+ _executor_write_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
_executor_close_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
@@ -232,48 +271,20 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
}
switch (slot_desc->type().type) {
- case TYPE_BOOLEAN: {
- uint8_t num = _jobject_to_uint8_t(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
- (uint8_t)num);
- break;
- }
- case TYPE_TINYINT: {
- int8_t num = _jobject_to_int8_t(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
- break;
+#define M(TYPE, CPP_TYPE, COLUMN_TYPE) \
+ case TYPE: { \
+ CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj); \
+ reinterpret_cast<COLUMN_TYPE*>(col_ptr)->insert_value(num); \
+ break; \
}
- case TYPE_SMALLINT: {
- int16_t num = _jobject_to_int16_t(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
- break;
- }
-
- case TYPE_INT: {
- int32_t num = _jobject_to_int32_t(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
- break;
- }
-
- case TYPE_BIGINT: {
- int64_t num = _jobject_to_int64_t(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
- break;
- }
-
- case TYPE_FLOAT: {
- float num = _jobject_to_float(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
- num);
- break;
- }
- case TYPE_DOUBLE: {
- double num = _jobject_to_double(env, jobj);
- reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
- num);
- break;
- }
-
+ M(TYPE_BOOLEAN, uint8_t, vectorized::ColumnVector<vectorized::UInt8>)
+ M(TYPE_TINYINT, int8_t, vectorized::ColumnVector<vectorized::Int8>)
+ M(TYPE_SMALLINT, int16_t, vectorized::ColumnVector<vectorized::Int16>)
+ M(TYPE_INT, int32_t, vectorized::ColumnVector<vectorized::Int32>)
+ M(TYPE_BIGINT, int64_t, vectorized::ColumnVector<vectorized::Int64>)
+ M(TYPE_FLOAT, float, vectorized::ColumnVector<vectorized::Float32>)
+ M(TYPE_DOUBLE, double, vectorized::ColumnVector<vectorized::Float64>)
+#undef M
case TYPE_STRING:
case TYPE_CHAR:
case TYPE_VARCHAR: {
@@ -317,7 +328,7 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(), insert_stmt.size());
- env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql);
+ env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_write_id, query_sql);
env->DeleteLocalRef(query_sql);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
return Status::OK();
@@ -325,13 +336,15 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
- const jbyteArray stringJbytes =
- (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8"));
+ auto coding = env->NewStringUTF("UTF-8");
+ const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding);
size_t length = (size_t)env->GetArrayLength(stringJbytes);
jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr);
std::string str = std::string((char*)pBytes, length);
env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(stringJbytes);
+ env->DeleteLocalRef(jstr);
+ env->DeleteLocalRef(coding);
return str;
}
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index 04aa8982da..e6f957746f 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -42,7 +42,7 @@ public:
~JdbcConnector() override;
- Status open() override;
+ Status open(RuntimeState* state, bool read = false) override;
Status query() override;
@@ -56,6 +56,8 @@ public:
Status abort_trans() override; // should be call after transaction abort
Status finish_trans() override; // should be call after transaction commit
+ Status close() override;
+
private:
Status _register_func_id(JNIEnv* env);
Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
@@ -65,13 +67,15 @@ private:
int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj);
const JdbcConnectorParam& _conn_param;
+ bool _closed;
jclass _executor_clazz;
jclass _executor_list_clazz;
jclass _executor_object_clazz;
jclass _executor_string_clazz;
jobject _executor_obj;
jmethodID _executor_ctor_id;
- jmethodID _executor_query_id;
+ jmethodID _executor_write_id;
+ jmethodID _executor_read_id;
jmethodID _executor_has_next_id;
jmethodID _executor_get_blocks_id;
jmethodID _executor_close_id;
diff --git a/be/src/vec/exec/vjdbc_scan_node.cpp b/be/src/vec/exec/vjdbc_scan_node.cpp
index e857218e97..7d15603992 100644
--- a/be/src/vec/exec/vjdbc_scan_node.cpp
+++ b/be/src/vec/exec/vjdbc_scan_node.cpp
@@ -91,7 +91,7 @@ Status VJdbcScanNode::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(_jdbc_connector->open());
+ RETURN_IF_ERROR(_jdbc_connector->open(state, true));
RETURN_IF_ERROR(_jdbc_connector->query());
return Status::OK();
}
diff --git a/be/src/vec/exec/vodbc_scan_node.cpp b/be/src/vec/exec/vodbc_scan_node.cpp
index 7bdc44501f..994eedc926 100644
--- a/be/src/vec/exec/vodbc_scan_node.cpp
+++ b/be/src/vec/exec/vodbc_scan_node.cpp
@@ -100,7 +100,7 @@ Status VOdbcScanNode::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(_odbc_scanner->open());
+ RETURN_IF_ERROR(_odbc_scanner->open(state));
RETURN_IF_ERROR(_odbc_scanner->query());
// check materialize slot num
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp
index b4ad942a3d..4c428aa47b 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -58,7 +58,7 @@ Status VJdbcTableSink::open(RuntimeState* state) {
// create writer
_writer.reset(new JdbcConnector(_jdbc_param));
- RETURN_IF_ERROR(_writer->open());
+ RETURN_IF_ERROR(_writer->open(state, false));
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
@@ -96,6 +96,7 @@ Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) {
if (exec_status.ok() && _use_transaction) {
RETURN_IF_ERROR(_writer->finish_trans());
}
+ RETURN_IF_ERROR(_writer->close());
return DataSink::close(state, exec_status);
}
} // namespace vectorized
diff --git a/be/src/vec/sink/vodbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp
index 695c06ac68..8d6a0596b2 100644
--- a/be/src/vec/sink/vodbc_table_sink.cpp
+++ b/be/src/vec/sink/vodbc_table_sink.cpp
@@ -48,7 +48,7 @@ Status VOdbcTableSink::open(RuntimeState* state) {
// create writer
_writer.reset(new ODBCConnector(_odbc_param));
- RETURN_IF_ERROR(_writer->open());
+ RETURN_IF_ERROR(_writer->open(state));
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt
index 4403b5a519..02888b004b 100644
--- a/dist/LICENSE-dist.txt
+++ b/dist/LICENSE-dist.txt
@@ -882,7 +882,7 @@ The Apache Software License, Version 2.0
* Hibernate Validator Engine:
- org.hibernate:hibernate-validator:5.1.0.Final (http://validator.hibernate.org/hibernate-validator)
* HikariCP:
- - com.zaxxer:HikariCP:4.0.3 (https://github.com/brettwooldridge/HikariCP)
+ - com.zaxxer:HikariCP:3.4.5 (https://github.com/brettwooldridge/HikariCP)
* Hive Common:
- org.apache.hive:hive-common:2.3.7 (https://hive.apache.org/hive-common)
* Hive Llap Client:
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index dbcebd9ced..3b0595442d 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -70,6 +70,11 @@ under the License.
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ <version>${hikaricp.version}</version>
+ </dependency>
</dependencies>
<build>
<finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 4936bc5be3..49da4332c7 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -19,17 +19,20 @@ package org.apache.doris.udf;
import org.apache.doris.thrift.TJdbcExecutorCtorParams;
+import org.apache.doris.thrift.TJdbcOperation;
+import com.google.common.base.Preconditions;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import java.net.MalformedURLException;
-import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -43,11 +46,12 @@ import java.util.List;
public class JdbcExecutor {
private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
- private URLClassLoader classLoader = null;
private Connection conn = null;
private Statement stmt = null;
private ResultSet resultSet = null;
private ResultSetMetaData resultSetMetaData = null;
+ // Use HikariDataSource to help us manage the JDBC connections.
+ private HikariDataSource dataSource = null;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -57,8 +61,8 @@ public class JdbcExecutor {
} catch (TException e) {
throw new InternalException(e.getMessage());
}
- init(request.jar_location_path, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user,
- request.jdbc_password);
+ init(request.statement, request.batch_size, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user,
+ request.jdbc_password, request.op);
}
public void close() throws Exception {
@@ -71,12 +75,26 @@ public class JdbcExecutor {
if (conn != null) {
conn.close();
}
- if (classLoader != null) {
- classLoader.close();
+ if (dataSource != null) {
+ dataSource.close();
}
+ resultSet = null;
+ stmt = null;
+ conn = null;
+ dataSource = null;
}
- public int querySQL(String sql) throws UdfRuntimeException {
+ public int read() throws UdfRuntimeException {
+ try {
+ resultSet = ((PreparedStatement) stmt).executeQuery();
+ resultSetMetaData = resultSet.getMetaData();
+ return resultSetMetaData.getColumnCount();
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+ }
+ }
+
+ public int write(String sql) throws UdfRuntimeException {
try {
boolean res = stmt.execute(sql);
if (res) { // sql query
@@ -175,28 +193,28 @@ public class JdbcExecutor {
return time;
}
- private void init(String driverPath, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword)
- throws UdfRuntimeException {
+ private void init(String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser,
+ String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException {
try {
- ClassLoader loader;
- if (driverPath != null) {
- ClassLoader parent = getClass().getClassLoader();
- classLoader = UdfUtils.getClassLoader(driverPath, parent);
- loader = classLoader;
+ HikariConfig config = new HikariConfig();
+ config.setDriverClassName(driverClass);
+ config.setJdbcUrl(jdbcUrl);
+ config.setUsername(jdbcUser);
+ config.setPassword(jdbcPassword);
+ config.setMaximumPoolSize(1);
+
+ dataSource = new HikariDataSource(config);
+ conn = dataSource.getConnection();
+ conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
+ if (op == TJdbcOperation.READ) {
+ Preconditions.checkArgument(sql != null);
+ stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(batchSize);
} else {
- loader = ClassLoader.getSystemClassLoader();
+ stmt = conn.createStatement();
}
- Class.forName(driverClass, true, loader);
- conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
- stmt = conn.createStatement();
- } catch (MalformedURLException e) {
- throw new UdfRuntimeException("MalformedURLException to load class about " + driverPath, e);
- } catch (ClassNotFoundException e) {
- throw new UdfRuntimeException("Loading JDBC class error ClassNotFoundException about " + driverClass, e);
} catch (SQLException e) {
- throw new UdfRuntimeException("Connection JDBC class error about " + jdbcUrl, e);
- } catch (Exception e) {
- throw new UdfRuntimeException("unable to init jdbc executor Exception ", e);
+ throw new UdfRuntimeException("Initialize datasource failed: ", e);
}
}
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 4c228dd007..9f9888b680 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -188,6 +188,7 @@ under the License.
<commons-io.version>2.6</commons-io.version>
<json-simple.version>1.1.1</json-simple.version>
<junit.version>5.8.2</junit.version>
+ <hikaricp.version>3.4.5</hikaricp.version>
<thrift.version>0.13.0</thrift.version>
<log4j2.version>2.18.0</log4j2.version>
<metrics-core.version>4.0.2</metrics-core.version>
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index d4a4851d9b..5104045f23 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -357,9 +357,13 @@ struct TFunction {
13: optional bool vectorized = false
}
+enum TJdbcOperation {
+ READ,
+ WRITE
+}
+
struct TJdbcExecutorCtorParams {
- // Local path to the UDF's jar file
- 1: optional string jar_location_path
+ 1: optional string statement
// "jdbc:mysql://127.0.0.1:3307/test";
2: optional string jdbc_url
@@ -372,6 +376,10 @@ struct TJdbcExecutorCtorParams {
//"com.mysql.jdbc.Driver"
5: optional string jdbc_driver_class
+
+ 6: optional i32 batch_size
+
+ 7: optional TJdbcOperation op
}
struct TJavaUdfExecutorCtorParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org