You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/06/21 10:37:50 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a diff in pull request #10526: add base metadata query (#5954)

ruanwenjun commented on code in PR #10526:
URL: https://github.com/apache/dolphinscheduler/pull/10526#discussion_r902438408


##########
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientManager.java:
##########
@@ -46,34 +47,38 @@ public class DataSourceClientProvider {
             .build();
     private DataSourcePluginManager dataSourcePluginManager;
 
-    private DataSourceClientProvider() {
+    private DataSourceClientManager() {
         initDataSourcePlugin();
     }
 
     private static class DataSourceClientProviderHolder {
-        private static final DataSourceClientProvider INSTANCE = new DataSourceClientProvider();
+        private static final DataSourceClientManager INSTANCE = new DataSourceClientManager();
     }
 
-    public static DataSourceClientProvider getInstance() {
+    public static DataSourceClientManager getInstance() {
         return DataSourceClientProviderHolder.INSTANCE;
     }
 
-    public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
+    public DataSourceClient getDataSource(DbType dbType, ConnectionParam connectionParam) {
         BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
         String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
         logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
 
-        DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
-            Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
-            DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
-            if (null == dataSourceChannel) {
-                throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));
-            }
-            return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);
-        });
-        return dataSourceClient.getConnection();
+        try {
+            return uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
+                Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
+                DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
+                if (null == dataSourceChannel) {
+                    throw DataSourceException.getInstance(String.format("datasource plugin '%s' is not found", dbType.getDescp()));

Review Comment:
   ```suggestion
                       throw new DataSourceException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));
   ```
   Directly new an exception is better?



##########
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java:
##########
@@ -101,6 +117,140 @@ public void checkClient() {
         }
     }
 
+    @Override
+    public List<String> getDatabaseList(String databasePattern) {
+        return this.jdbcTemplate.queryForList(getDatabaseListSql(databasePattern), String.class);
+    }
+
+    protected String getDatabaseListSql(String databasePattern) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public List<String> getTableList(String dbName, String schemaName, String tablePattern) {
+        return this.jdbcTemplate.queryForList(getTableListSql(dbName, schemaName, tablePattern), String.class);
+    }
+
+    protected String getTableListSql(String dbName, String schemaName, String tablePattern) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public List<Map<String, Object>> getTableStruct(String dbName, String schemaName, String tableName) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public MutableTriple<Map<String, String>, List<Map<String, Object>>, List<Map<String, String>>> executeSql(
+        String dbName, String schemaName, Boolean oneSession, String querySql) {
+        List<String> sqlList = assemblingSql(dbName, schemaName, querySql);
+        JdbcTemplate jdbcTemplate = getJdbcTemplate(oneSession);
+        return executeSqlListReturnLast(jdbcTemplate, sqlList);
+    }
+
+    protected JdbcTemplate getJdbcTemplate(Boolean oneSession) {
+        return this.jdbcTemplate;
+    }
+
+    protected String switchEnvironment(String dbName, String schemaName) {
+        return null;
+    }
+
+    private List<String> assemblingSql(String dbName, String schemaName, String querySql) {
+        return Lists.asList(switchEnvironment(dbName, schemaName), querySql.split(";")).stream()
+            .filter(StringUtils::isNotBlank)
+            .collect(Collectors.toList());
+    }
+
+    private String getColumnName(String columnName) {
+        return columnName.contains(".") ? columnName.split("\\.", 2)[1] : columnName;
+    }
+
+    protected MutableTriple<Map<String, String>, List<Map<String, Object>>, List<Map<String, String>>>
+    executeSqlListReturnLast(JdbcTemplate jdbcTemplate, List<String> querySqlList) {
+        Stopwatch stopwatchForAll = Stopwatch.createStarted();
+        try {
+            return jdbcTemplate.execute(

Review Comment:
   Can we directly use `query` for each given `sql`?



##########
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java:
##########
@@ -101,6 +117,140 @@ public void checkClient() {
         }
     }
 
+    @Override
+    public List<String> getDatabaseList(String databasePattern) {
+        return this.jdbcTemplate.queryForList(getDatabaseListSql(databasePattern), String.class);
+    }
+
+    protected String getDatabaseListSql(String databasePattern) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public List<String> getTableList(String dbName, String schemaName, String tablePattern) {
+        return this.jdbcTemplate.queryForList(getTableListSql(dbName, schemaName, tablePattern), String.class);
+    }
+
+    protected String getTableListSql(String dbName, String schemaName, String tablePattern) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public List<Map<String, Object>> getTableStruct(String dbName, String schemaName, String tableName) {
+        throw new UnsupportedOperationException("NOT_SUPPORT");
+    }
+
+    @Override
+    public MutableTriple<Map<String, String>, List<Map<String, Object>>, List<Map<String, String>>> executeSql(

Review Comment:
   Can we return a pojo here?



##########
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/MetaStoreClientPool.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Manages a pool of RetryingMetaStoreClient connections. If the connection pool is empty
+ * a new client is created and added to the pool. The idle pool can expand till a maximum
+ * size of MAX_HMS_CONNECTION_POOL_SIZE, beyond which the connections are closed.
+ *
+ * This default implementation reads the Hive metastore configuration from the HiveConf
+ * object passed in the c'tor. If you are looking for a temporary HMS instance created
+ * from scratch for unit tests, refer to EmbeddedMetastoreClientPool class. It mocks an
+ * actual HMS by creating a temporary Derby backend database on the fly. It should not
+ * be used for production Catalog server instances.
+ */
+public class MetaStoreClientPool {

Review Comment:
   Do we need to add license about this file, it seems this file is very similar in other project.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org