You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 14:44:16 UTC
[doris] 04/06: [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit a071827534cca966b87d1ea412ceb1c3f11058bb
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Sat Jan 21 08:22:22 2023 +0800
[vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843)
result set should be get by batch size2.
fix memory leak3.
---
be/src/vec/exec/vjdbc_connector.cpp | 6 +-
be/src/vec/exec/vjdbc_connector.h | 1 +
fe/java-udf/pom.xml | 5 --
.../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++
.../java/org/apache/doris/udf/JdbcExecutor.java | 96 ++++++++++++----------
5 files changed, 129 insertions(+), 49 deletions(-)
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 519ccbbc77..0eb6d6f2a3 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
const std::string& column_name = slot_desc->col_name();
jobject column_data =
env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index);
- jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id);
-
+ jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
+ _executor_block_rows_id);
for (int row = 0; row < num_rows; ++row) {
jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
@@ -413,6 +413,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
_executor_close_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
_executor_has_next_id));
+ RETURN_IF_ERROR(
+ register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
_executor_get_blocks_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index c1d416783c..4f05253d64 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -114,6 +114,7 @@ private:
jmethodID _executor_write_id;
jmethodID _executor_read_id;
jmethodID _executor_has_next_id;
+ jmethodID _executor_block_rows_id;
jmethodID _executor_get_blocks_id;
jmethodID _executor_get_types_id;
jmethodID _executor_get_arr_list_id;
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 493015d49c..b37e9c4696 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -104,11 +104,6 @@ under the License.
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <version>${hikaricp.version}</version>
- </dependency>
</dependencies>
<build>
<finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
new file mode 100644
index 0000000000..94fbde6217
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+
+public class FakeDriver implements Driver {
+ private Driver driver;
+
+ FakeDriver(Driver driver) {
+ this.driver = driver;
+ }
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException {
+ return this.driver.connect(url, info);
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws SQLException {
+ return this.driver.acceptsURL(url);
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
+ return this.driver.getPropertyInfo(url, info);
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return this.driver.getMajorVersion();
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return this.driver.getMinorVersion();
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return this.driver.jdbcCompliant();
+ }
+
+ @Override
+ public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index d90aa4055a..e76e9e78e3 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
import org.apache.doris.thrift.TJdbcOperation;
import com.google.common.base.Preconditions;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import java.io.FileNotFoundException;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Date;
+import java.sql.Driver;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -50,10 +53,12 @@ public class JdbcExecutor {
private Statement stmt = null;
private ResultSet resultSet = null;
private ResultSetMetaData resultSetMetaData = null;
- // Use HikariDataSource to help us manage the JDBC connections.
- private HikariDataSource dataSource = null;
private List<String> resultColumnTypeNames = null;
private int baseTypeInt = 0;
+ private URLClassLoader classLoader = null;
+ private List<List<Object>> block = null;
+ private int bacthSizeNum = 0;
+ private int curBlockRows = 0;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -75,22 +80,31 @@ public class JdbcExecutor {
stmt.close();
}
if (conn != null) {
+ conn.clearWarnings();
conn.close();
}
- if (dataSource != null) {
- dataSource.close();
+ if (classLoader != null) {
+ classLoader.clearAssertionStatus();
+ classLoader.close();
}
resultSet = null;
stmt = null;
conn = null;
- dataSource = null;
+ classLoader = null;
}
public int read() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
resultSetMetaData = resultSet.getMetaData();
- return resultSetMetaData.getColumnCount();
+ int columnCount = resultSetMetaData.getColumnCount();
+ resultColumnTypeNames = new ArrayList<>(columnCount);
+ block = new ArrayList<>(columnCount);
+ for (int i = 0; i < columnCount; ++i) {
+ resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+ block.add(Arrays.asList(new Object[bacthSizeNum]));
+ }
+ return columnCount;
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
}
@@ -111,17 +125,8 @@ public class JdbcExecutor {
}
}
- public List<String> getResultColumnTypeNames() throws UdfRuntimeException {
- try {
- int count = resultSetMetaData.getColumnCount();
- resultColumnTypeNames = new ArrayList<>(count);
- for (int i = 0; i < count; ++i) {
- resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
- }
- return resultColumnTypeNames;
- } catch (SQLException e) {
- throw new UdfRuntimeException("JDBC executor getResultColumnTypeNames has error: ", e);
- }
+ public List<String> getResultColumnTypeNames() {
+ return resultColumnTypeNames;
}
public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException {
@@ -169,20 +174,15 @@ public class JdbcExecutor {
}
public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException {
- List<List<Object>> block = null;
try {
int columnCount = resultSetMetaData.getColumnCount();
- block = new ArrayList<>(columnCount);
- for (int i = 0; i < columnCount; ++i) {
- block.add(new ArrayList<>(batchSize));
- }
- int numRows = 0;
+ curBlockRows = 0;
do {
for (int i = 0; i < columnCount; ++i) {
- block.get(i).add(resultSet.getObject(i + 1));
+ block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
}
- numRows++;
- } while (numRows < batchSize && resultSet.next());
+ curBlockRows++;
+ } while (curBlockRows < batchSize && resultSet.next());
} catch (SQLException e) {
throw new UdfRuntimeException("get next block failed: ", e);
} catch (Exception e) {
@@ -191,6 +191,10 @@ public class JdbcExecutor {
return block;
}
+ public int getCurBlockRows() {
+ return curBlockRows;
+ }
+
public boolean hasNext() throws UdfRuntimeException {
try {
if (resultSet == null) {
@@ -242,33 +246,41 @@ public class JdbcExecutor {
private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser,
String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException {
try {
- ClassLoader parent = getClass().getClassLoader();
- ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent);
- Thread.currentThread().setContextClassLoader(classLoader);
- HikariConfig config = new HikariConfig();
- config.setDriverClassName(driverClass);
- config.setJdbcUrl(jdbcUrl);
- config.setUsername(jdbcUser);
- config.setPassword(jdbcPassword);
- config.setMaximumPoolSize(1);
+ File file = new File(driverUrl);
+ URL url = file.toURI().toURL();
+ classLoader = new URLClassLoader(new URL[] {url});
+ Driver driver = (Driver) Class.forName(driverClass, true, classLoader).getDeclaredConstructor()
+ .newInstance();
+ // in jdk11 cann't call addURL function by reflect to load class. so use this way
+ // But DriverManager can't find the driverClass correctly, so add a faker driver
+ // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
+ DriverManager.registerDriver(new FakeDriver(driver));
+ conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
- dataSource = new HikariDataSource(config);
- conn = dataSource.getConnection();
if (op == TJdbcOperation.READ) {
conn.setAutoCommit(false);
Preconditions.checkArgument(sql != null);
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.FETCH_FORWARD);
stmt.setFetchSize(batchSize);
+ bacthSizeNum = batchSize;
} else {
stmt = conn.createStatement();
}
- } catch (FileNotFoundException e) {
- throw new UdfRuntimeException("Can not find driver file: " + driverUrl, e);
+ } catch (ClassNotFoundException e) {
+ throw new UdfRuntimeException("ClassNotFoundException: " + driverClass, e);
} catch (MalformedURLException e) {
throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
+ } catch (InstantiationException e) {
+ throw new UdfRuntimeException("InstantiationException failed: ", e);
+ } catch (IllegalAccessException e) {
+ throw new UdfRuntimeException("IllegalAccessException failed: ", e);
+ } catch (InvocationTargetException e) {
+ throw new UdfRuntimeException("InvocationTargetException new instance failed: ", e);
+ } catch (NoSuchMethodException e) {
+ throw new UdfRuntimeException("NoSuchMethodException Load class failed: ", e);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org