You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/26 13:44:59 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #7624: [feature-7623][plugin] Refactored data source plug-in loading

caishunfeng commented on a change in pull request #7624:
URL: https://github.com/apache/dolphinscheduler/pull/7624#discussion_r775241949



##########
File path: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
##########
@@ -17,57 +17,153 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.plugin.datasource.api.exception.DataSourceException;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.ClassLoaderUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.ThreadContextClassLoader;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.JdbcConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import java.sql.Connection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.io.FilenameFilter;
+import java.net.MalformedURLException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
 public class DataSourceClientProvider {
     private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
 
-    private static final Map<String, DataSourceClient> uniqueId2dataSourceClientMap = new ConcurrentHashMap<>();
+    private static final JdbcDriverManager jdbcDriverManagerInstance = JdbcDriverManager.getInstance();
 
-    private DataSourcePluginManager dataSourcePluginManager;
+    public static DataSourceClient createDataSourceClient(JdbcConnectionParam connectionParam) {
+        logger.info("Creating the createDataSourceClient. JdbcUrl: {} ", connectionParam.getJdbcUrl());
 
-    private DataSourceClientProvider() {
-        initDataSourcePlugin();
-    }
+        //Check jdbc driver location
+        checkDriverLocation(connectionParam);
+
+        logger.info("Creating the ClassLoader for the jdbc driver and plugin.");
+        ClassLoader driverClassLoader = getDriverClassLoader(connectionParam);
 
-    private static class DataSourceClientProviderHolder {
-        private static final DataSourceClientProvider INSTANCE = new DataSourceClientProvider();
+        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(driverClassLoader)) {
+            return createDataSourceClientWithClassLoader(connectionParam);
+        }
     }
 
-    public static DataSourceClientProvider getInstance() {
-        return DataSourceClientProviderHolder.INSTANCE;
+    protected static void checkDriverLocation(JdbcConnectionParam connectionParam) {
+        final String driverLocation = connectionParam.getDriverLocation();
+        if (StringUtils.isBlank(driverLocation)) {
+            logger.warn("No jdbc driver provide,will use randomly driver jar for {}.", connectionParam.getDbType().getDescp());
+            connectionParam.setDriverLocation(jdbcDriverManagerInstance.getDefaultDriverPluginPath(connectionParam.getDbType().getDescp()));
+        }
     }
 
-    public Connection getConnection(DbType dbType, ConnectionParam connectionParam) {
-        BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
-        String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
-        logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
+    protected static ClassLoader getDriverClassLoader(JdbcConnectionParam connectionParam) {
+        FilenameFilter filenameFilter = (dir, name) -> name != null && name.endsWith(".jar");
+        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader classLoader;
+
+        HashSet<String> paths = Sets.newHashSet();
+        if (StringUtils.isNotBlank(connectionParam.getDriverLocation())) {
+            logger.info("Driver location: {}", connectionParam.getDriverLocation());
+            paths.add(connectionParam.getDriverLocation());
+        }
+        try {
+            classLoader = ClassLoaderUtils.getCustomClassLoader(paths, threadClassLoader, filenameFilter);
+        } catch (final MalformedURLException e) {
+            throw DataSourceException.getInstance("Invalid jdbc driver location.", e);
+        }
+
+        //try loading jdbc driver
+        loadJdbcDriver(classLoader, connectionParam);
+
+        DbType dbType = connectionParam.getDbType();
+        String pluginPath = JdbcDriverManager.getInstance().getPluginPath(dbType);
+        logger.info("Plugin location: {}", pluginPath);
+        paths.add(pluginPath);
+
+        if (dbType == DbType.HIVE || dbType == DbType.SPARK) {
+            try {
+                Class.forName("org.apache.hadoop.conf.Configuration", true, classLoader);
+                Class.forName("org.apache.hadoop.security.UserGroupInformation", true, classLoader);
+                Class.forName("org.apache.hadoop.fs.FileSystem", true, classLoader);
+            } catch (ClassNotFoundException cnf) {
+                cnf.printStackTrace();

Review comment:
       Is it better to add error log?

##########
File path: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/ThreadContextClassLoader.java
##########
@@ -15,33 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.utils;
+package org.apache.dolphinscheduler.plugin.datasource.api.utils;
 
-import org.junit.Assert;
-import org.junit.Test;
+import java.io.Closeable;
 
-/**
- * hive conf utils test
- */
-public class HiveConfUtilsTest {
-
-    /**
-     * test is hive conf var
-     */
-    @Test
-    public void testIsHiveConfVar() {
-
-        String conf = "hive.exec.script.wrapper=123";
-        boolean hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertTrue(hiveConfVar);
+public class ThreadContextClassLoader
+        implements Closeable {
+    private final ClassLoader threadContextClassLoader;
 
-        conf = "hive.test.v1=v1";
-        hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertFalse(hiveConfVar);
-
-        conf = "tez.queue.name=tezQueue";
-        hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertTrue(hiveConfVar);
+    public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {

Review comment:
       It seems that the class loader is thread level? 
   What's the matter if some datasource thread was created but not set classLoader?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org