You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:43 UTC

[kylin] 12/15: add connect timeout and ext config from yml

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 28f684b6f9ab8d392fac17675376ab0b5814cd28
Author: Zhixiong Chen <ch...@apache.org>
AuthorDate: Thu Oct 27 15:42:25 2022 +0800

    add connect timeout and ext config from yml
    
    Co-authored-by: chenzhx <ch...@apache.io>
---
 .../kap/newten/clickhouse/ClickHouseUtils.java     |  4 +-
 .../kap/clickhouse/ClickHouseStorage.java          |  7 +++
 .../kyligence/kap/clickhouse/job/ClickHouse.java   | 11 +++-
 .../management/ClickHouseConfigLoader.java         |  2 +
 .../kap/clickhouse/MockSecondStorage.java          |  2 +
 .../kap/secondstorage/config/ClusterInfo.java      | 24 +++++++-
 .../engine/spark/NLocalWithSparkSessionTest.java   | 68 ++++++++++++----------
 .../utils/HiveTransactionTableHelperTest.java      | 17 +++++-
 8 files changed, 96 insertions(+), 39 deletions(-)

diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
index 0ca4863f8c..c6431ad78c 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
@@ -275,7 +275,7 @@ public class ClickHouseUtils {
         int pairNum = clickhouse.length / replica;
         IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
         ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
-                .setCluster(clusterNode);
+                .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
         int i = 0;
         for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
             Node node = new Node();
@@ -302,7 +302,7 @@ public class ClickHouseUtils {
         int pairNum = clickhouse.length / replica;
         IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
         ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
-                .setCluster(clusterNode);
+                .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
         int i = 0;
         for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
             Node node = new Node();
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
index 0f4ebbfb83..9e5c578127 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
@@ -178,6 +178,13 @@ public class ClickHouseStorage implements SecondStoragePlugin {
         if (StringUtils.isNotEmpty(cluster.getSocketTimeout())) {
             param.put(ClickHouse.SOCKET_TIMEOUT, cluster.getSocketTimeout());
         }
+        if (StringUtils.isNotEmpty(cluster.getConnectTimeout())) {
+            int timeout = Integer.parseInt(cluster.getConnectTimeout()) / 1000;
+            param.put(ClickHouse.CONNECT_TIMEOUT, Integer.toString(timeout));
+        }
+        if (StringUtils.isNotEmpty(cluster.getExtConfig())) {
+            param.put(ClickHouse.EXT_CONFIG, cluster.getExtConfig());
+        }
         if (StringUtils.isNotEmpty(node.getUser())) {
             param.put(ClickHouse.USER, node.getUser());
         }
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
index 7a0100c17b..9664bde2e7 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
@@ -52,6 +52,8 @@ public class ClickHouse implements Closeable {
     public static final String USER = "user";
     public static final String SOCKET_TIMEOUT = "socket_timeout";
     public static final String KEEP_ALIVE_TIMEOUT = "keepAliveTimeout";
+    public static final String CONNECT_TIMEOUT = "connect_timeout";
+    public static final String EXT_CONFIG = "extConfig";
     public static final String CLIENT_NAME = "client_name";
 
     private final String shardName;
@@ -110,8 +112,15 @@ public class ClickHouse implements Closeable {
         if (!param.isEmpty()) {
             base.append('?');
             List<String> paramList = new ArrayList<>();
-            param.forEach((name, value) -> paramList.add(name + "=" + value));
+            param.forEach((name, value) -> {
+              if (!ClickHouse.EXT_CONFIG.equals(name)){
+                paramList.add(name + "=" + value);
+              }
+            });
             base.append(String.join("&", paramList));
+            if(param.get(ClickHouse.EXT_CONFIG) != null) {
+                base.append("&").append(param.get(ClickHouse.EXT_CONFIG));
+            }
         }
         return base.toString();
     }
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
index a9f2794065..06c4143328 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
@@ -66,10 +66,12 @@ public class ClickHouseConfigLoader implements SecondStorageConfigLoader {
         clusterDesc.addPropertyParameters("cluster", String.class, List.class);
         clusterDesc.addPropertyParameters("socketTimeout", String.class);
         clusterDesc.addPropertyParameters("keepAliveTimeout", String.class);
+        clusterDesc.addPropertyParameters("connectTimeout", String.class);
         clusterDesc.addPropertyParameters("installPath", String.class);
         clusterDesc.addPropertyParameters("logPath", String.class);
         clusterDesc.addPropertyParameters("userName", String.class);
         clusterDesc.addPropertyParameters("password", String.class);
+        clusterDesc.addPropertyParameters("extConfig", String.class);
         constructor.addTypeDescription(clusterDesc);
         val nodeDesc = new TypeDescription(Node.class);
         nodeDesc.addPropertyParameters("name", String.class);
diff --git a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
index 60191478a6..50f58b933a 100644
--- a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
+++ b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
@@ -49,6 +49,7 @@ public class MockSecondStorage {
         ClusterInfo cluster = new ClusterInfo();
         cluster.setKeepAliveTimeout("600000");
         cluster.setSocketTimeout("600000");
+        cluster.setConnectTimeout("3000");
         cluster.setCluster(Collections.emptyMap());
         File file = File.createTempFile("clickhouse", ".yaml");
         ClickHouseConfigLoader.getConfigYaml().dump(JsonUtil.readValue(JsonUtil.writeValueAsString(cluster),
@@ -62,6 +63,7 @@ public class MockSecondStorage {
         ClusterInfo cluster = new ClusterInfo();
         cluster.setKeepAliveTimeout("600000");
         cluster.setSocketTimeout("600000");
+        cluster.setConnectTimeout("3000");
         Map<String, List<Node>> clusterNodes = new HashMap<>();
         cluster.setCluster(clusterNodes);
         val it = nodes.listIterator();
diff --git a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
index e57010a1f9..ab29f99695 100644
--- a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
+++ b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
@@ -36,12 +36,14 @@ public class ClusterInfo {
     private Map<String, List<Node>> cluster;
     private String socketTimeout;
     private String keepAliveTimeout;
+    private String connectTimeout;
     private String installPath;
     private String logPath;
 
     //username of machine
     private String userName;
     private String password;
+    private String extConfig;
 
     @JsonIgnore
     public List<Node> getNodes() {
@@ -93,6 +95,24 @@ public class ClusterInfo {
         return keepAliveTimeout;
     }
 
+    public String getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public ClusterInfo setConnectTimeout(String connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    public String getExtConfig() {
+        return extConfig;
+    }
+
+    public ClusterInfo setExtConfig(String extConfig) {
+        this.extConfig = extConfig;
+        return this;
+    }
+
     public ClusterInfo setKeepAliveTimeout(String keepAliveTimeout) {
         this.keepAliveTimeout = keepAliveTimeout;
         return this;
@@ -135,11 +155,13 @@ public class ClusterInfo {
     public ClusterInfo(ClusterInfo cluster) {
         this.cluster = Maps.newHashMap(cluster.getCluster());
         this.keepAliveTimeout = cluster.getKeepAliveTimeout();
-        this.socketTimeout = cluster.getKeepAliveTimeout();
+        this.socketTimeout = cluster.getSocketTimeout();
+        this.connectTimeout = cluster.getConnectTimeout();
         this.logPath = cluster.getLogPath();
         this.userName = cluster.getUserName();
         this.password = cluster.getPassword();
         this.installPath = cluster.getInstallPath();
+        this.extConfig = cluster.getExtConfig();
     }
 
     public boolean emptyCluster() {
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
index 326386b865..df13963a68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
@@ -17,10 +17,15 @@
  */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.Preconditions;
-import org.apache.kylin.engine.spark.job.NSparkMergingJob;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.util.Shell;
@@ -28,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.job.NSparkMergingJob;
 import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -60,14 +66,10 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.sparkproject.guava.collect.Sets;
 
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase implements Serializable {
@@ -129,7 +131,9 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
 
     @AfterClass
     public static void afterClass() {
-        ss.close();
+        if (ss != null) {
+            ss.close();
+        }
         FileUtils.deleteQuietly(new File("../kap-it/metastore_db"));
     }
 
@@ -211,29 +215,29 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
 
         if (type.isIntegerFamily())
             switch (type.getName()) {
-                case "tinyint":
-                    return DataTypes.ByteType;
-                case "smallint":
-                    return DataTypes.ShortType;
-                case "integer":
-                case "int4":
-                    return DataTypes.IntegerType;
-                default:
-                    return DataTypes.LongType;
+            case "tinyint":
+                return DataTypes.ByteType;
+            case "smallint":
+                return DataTypes.ShortType;
+            case "integer":
+            case "int4":
+                return DataTypes.IntegerType;
+            default:
+                return DataTypes.LongType;
             }
 
         if (type.isNumberFamily())
             switch (type.getName()) {
-                case "float":
-                    return DataTypes.FloatType;
-                case "double":
-                    return DataTypes.DoubleType;
-                default:
-                    if (type.getPrecision() == -1 || type.getScale() == -1) {
-                        return DataTypes.createDecimalType(19, 4);
-                    } else {
-                        return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
-                    }
+            case "float":
+                return DataTypes.FloatType;
+            case "double":
+                return DataTypes.DoubleType;
+            default:
+                if (type.getPrecision() == -1 || type.getScale() == -1) {
+                    return DataTypes.createDecimalType(19, 4);
+                } else {
+                    return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
+                }
             }
 
         if (type.isStringFamily())
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
index 185c9fc0ff..dca40c9f68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
@@ -37,8 +37,11 @@ import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -51,6 +54,15 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
     private final String STORAGE_DFS_DIR = "/test";
     private final String FILED_DELIMITER = "|";
 
+    @BeforeClass
+    public static void beforeClass() {
+        if (SparderEnv.isSparkAvailable()) {
+            SparderEnv.getSparkSession().close();
+        }
+        SparkSession.clearActiveSession();
+        SparkSession.clearDefaultSession();
+    }
+
     @Before
     public void setup() {
         {
@@ -72,9 +84,8 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
         SystemPropertiesCache.setProperty("kylin.source.provider.9",
                 "io.kyligence.kap.engine.spark.source.NSparkDataSource");
         SystemPropertiesCache.setProperty("kylin.build.resource.read-transactional-table-enabled", "true");
-        KylinBuildEnv kylinBuildEnv = KylinBuildEnv.getOrCreate(getTestConfig());
-        NTableMetadataManager tableMgr = NTableMetadataManager
-                .getInstance(getTestConfig(), "tdh");
+        KylinBuildEnv kylinBuildEnv = new KylinBuildEnv(getTestConfig());
+        NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), "tdh");
         TableDesc fact = tableMgr.getTableDesc("TDH_TEST.LINEORDER_PARTITION");
         fact.setTransactional(true);
         String result = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(fact, Maps.newHashMap(), "LO_ORDERKEY", kylinBuildEnv);