You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 14:44:16 UTC

[doris] 04/06: [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a071827534cca966b87d1ea412ceb1c3f11058bb
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Sat Jan 21 08:22:22 2023 +0800

    [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843)
    
    result set should be get by batch size2.
    fix memory leak3.
---
 be/src/vec/exec/vjdbc_connector.cpp                |  6 +-
 be/src/vec/exec/vjdbc_connector.h                  |  1 +
 fe/java-udf/pom.xml                                |  5 --
 .../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 96 ++++++++++++----------
 5 files changed, 129 insertions(+), 49 deletions(-)

diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 519ccbbc77..0eb6d6f2a3 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
         const std::string& column_name = slot_desc->col_name();
         jobject column_data =
                 env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index);
-        jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id);
-
+        jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
+                                                     _executor_block_rows_id);
         for (int row = 0; row < num_rows; ++row) {
             jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
             RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
@@ -413,6 +413,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
                                 _executor_close_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
                                 _executor_has_next_id));
+    RETURN_IF_ERROR(
+            register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
                                 _executor_get_blocks_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index c1d416783c..4f05253d64 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -114,6 +114,7 @@ private:
     jmethodID _executor_write_id;
     jmethodID _executor_read_id;
     jmethodID _executor_has_next_id;
+    jmethodID _executor_block_rows_id;
     jmethodID _executor_get_blocks_id;
     jmethodID _executor_get_types_id;
     jmethodID _executor_get_arr_list_id;
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 493015d49c..b37e9c4696 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -104,11 +104,6 @@ under the License.
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.zaxxer</groupId>
-            <artifactId>HikariCP</artifactId>
-            <version>${hikaricp.version}</version>
-        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
new file mode 100644
index 0000000000..94fbde6217
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+
+public class FakeDriver implements Driver {
+    private Driver driver;
+
+    FakeDriver(Driver driver) {
+        this.driver = driver;
+    }
+
+    @Override
+    public Connection connect(String url, Properties info) throws SQLException {
+        return this.driver.connect(url, info);
+    }
+
+    @Override
+    public boolean acceptsURL(String url) throws SQLException {
+        return this.driver.acceptsURL(url);
+    }
+
+    @Override
+    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
+        return this.driver.getPropertyInfo(url, info);
+    }
+
+    @Override
+    public int getMajorVersion() {
+        return this.driver.getMajorVersion();
+    }
+
+    @Override
+    public int getMinorVersion() {
+        return this.driver.getMinorVersion();
+    }
+
+    @Override
+    public boolean jdbcCompliant() {
+        return this.driver.jdbcCompliant();
+    }
+
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index d90aa4055a..e76e9e78e3 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
 
 import com.google.common.base.Preconditions;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import java.io.FileNotFoundException;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.Date;
+import java.sql.Driver;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -50,10 +53,12 @@ public class JdbcExecutor {
     private Statement stmt = null;
     private ResultSet resultSet = null;
     private ResultSetMetaData resultSetMetaData = null;
-    // Use HikariDataSource to help us manage the JDBC connections.
-    private HikariDataSource dataSource = null;
     private List<String> resultColumnTypeNames = null;
     private int baseTypeInt = 0;
+    private URLClassLoader classLoader = null;
+    private List<List<Object>> block = null;
+    private int bacthSizeNum = 0;
+    private int curBlockRows = 0;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -75,22 +80,31 @@ public class JdbcExecutor {
             stmt.close();
         }
         if (conn != null) {
+            conn.clearWarnings();
             conn.close();
         }
-        if (dataSource != null) {
-            dataSource.close();
+        if (classLoader != null) {
+            classLoader.clearAssertionStatus();
+            classLoader.close();
         }
         resultSet = null;
         stmt = null;
         conn = null;
-        dataSource = null;
+        classLoader = null;
     }
 
     public int read() throws UdfRuntimeException {
         try {
             resultSet = ((PreparedStatement) stmt).executeQuery();
             resultSetMetaData = resultSet.getMetaData();
-            return resultSetMetaData.getColumnCount();
+            int columnCount = resultSetMetaData.getColumnCount();
+            resultColumnTypeNames = new ArrayList<>(columnCount);
+            block = new ArrayList<>(columnCount);
+            for (int i = 0; i < columnCount; ++i) {
+                resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+                block.add(Arrays.asList(new Object[bacthSizeNum]));
+            }
+            return columnCount;
         } catch (SQLException e) {
             throw new UdfRuntimeException("JDBC executor sql has error: ", e);
         }
@@ -111,17 +125,8 @@ public class JdbcExecutor {
         }
     }
 
-    public List<String> getResultColumnTypeNames() throws UdfRuntimeException {
-        try {
-            int count = resultSetMetaData.getColumnCount();
-            resultColumnTypeNames = new ArrayList<>(count);
-            for (int i = 0; i < count; ++i) {
-                resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
-            }
-            return resultColumnTypeNames;
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor getResultColumnTypeNames has error: ", e);
-        }
+    public List<String> getResultColumnTypeNames() {
+        return resultColumnTypeNames;
     }
 
     public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException {
@@ -169,20 +174,15 @@ public class JdbcExecutor {
     }
 
     public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException {
-        List<List<Object>> block = null;
         try {
             int columnCount = resultSetMetaData.getColumnCount();
-            block = new ArrayList<>(columnCount);
-            for (int i = 0; i < columnCount; ++i) {
-                block.add(new ArrayList<>(batchSize));
-            }
-            int numRows = 0;
+            curBlockRows = 0;
             do {
                 for (int i = 0; i < columnCount; ++i) {
-                    block.get(i).add(resultSet.getObject(i + 1));
+                    block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
                 }
-                numRows++;
-            } while (numRows < batchSize && resultSet.next());
+                curBlockRows++;
+            } while (curBlockRows < batchSize && resultSet.next());
         } catch (SQLException e) {
             throw new UdfRuntimeException("get next block failed: ", e);
         } catch (Exception e) {
@@ -191,6 +191,10 @@ public class JdbcExecutor {
         return block;
     }
 
+    public int getCurBlockRows() {
+        return curBlockRows;
+    }
+
     public boolean hasNext() throws UdfRuntimeException {
         try {
             if (resultSet == null) {
@@ -242,33 +246,41 @@ public class JdbcExecutor {
     private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser,
             String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException {
         try {
-            ClassLoader parent = getClass().getClassLoader();
-            ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent);
-            Thread.currentThread().setContextClassLoader(classLoader);
-            HikariConfig config = new HikariConfig();
-            config.setDriverClassName(driverClass);
-            config.setJdbcUrl(jdbcUrl);
-            config.setUsername(jdbcUser);
-            config.setPassword(jdbcPassword);
-            config.setMaximumPoolSize(1);
+            File file = new File(driverUrl);
+            URL url = file.toURI().toURL();
+            classLoader = new URLClassLoader(new URL[] {url});
+            Driver driver = (Driver) Class.forName(driverClass, true, classLoader).getDeclaredConstructor()
+                    .newInstance();
+            // in jdk11 cann't call addURL function by reflect to load class. so use this way
+            // But DriverManager can't find the driverClass correctly, so add a faker driver
+            // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
+            DriverManager.registerDriver(new FakeDriver(driver));
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
 
-            dataSource = new HikariDataSource(config);
-            conn = dataSource.getConnection();
             if (op == TJdbcOperation.READ) {
                 conn.setAutoCommit(false);
                 Preconditions.checkArgument(sql != null);
                 stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
                         ResultSet.FETCH_FORWARD);
                 stmt.setFetchSize(batchSize);
+                bacthSizeNum = batchSize;
             } else {
                 stmt = conn.createStatement();
             }
-        } catch (FileNotFoundException e) {
-            throw new UdfRuntimeException("Can not find driver file:  " + driverUrl, e);
+        } catch (ClassNotFoundException e) {
+            throw new UdfRuntimeException("ClassNotFoundException:  " + driverClass, e);
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e);
         } catch (SQLException e) {
             throw new UdfRuntimeException("Initialize datasource failed: ", e);
+        } catch (InstantiationException e) {
+            throw new UdfRuntimeException("InstantiationException failed: ", e);
+        } catch (IllegalAccessException e) {
+            throw new UdfRuntimeException("IllegalAccessException failed: ", e);
+        } catch (InvocationTargetException e) {
+            throw new UdfRuntimeException("InvocationTargetException new instance failed: ", e);
+        } catch (NoSuchMethodException e) {
+            throw new UdfRuntimeException("NoSuchMethodException Load class failed: ", e);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org