You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:43 UTC
[kylin] 12/15: add connect timeout and ext config from yml
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 28f684b6f9ab8d392fac17675376ab0b5814cd28
Author: Zhixiong Chen <ch...@apache.org>
AuthorDate: Thu Oct 27 15:42:25 2022 +0800
add connect timeout and ext config from yml
Co-authored-by: chenzhx <ch...@apache.io>
---
.../kap/newten/clickhouse/ClickHouseUtils.java | 4 +-
.../kap/clickhouse/ClickHouseStorage.java | 7 +++
.../kyligence/kap/clickhouse/job/ClickHouse.java | 11 +++-
.../management/ClickHouseConfigLoader.java | 2 +
.../kap/clickhouse/MockSecondStorage.java | 2 +
.../kap/secondstorage/config/ClusterInfo.java | 24 +++++++-
.../engine/spark/NLocalWithSparkSessionTest.java | 68 ++++++++++++----------
.../utils/HiveTransactionTableHelperTest.java | 17 +++++-
8 files changed, 96 insertions(+), 39 deletions(-)
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
index 0ca4863f8c..c6431ad78c 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java
@@ -275,7 +275,7 @@ public class ClickHouseUtils {
int pairNum = clickhouse.length / replica;
IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
- .setCluster(clusterNode);
+ .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
int i = 0;
for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
Node node = new Node();
@@ -302,7 +302,7 @@ public class ClickHouseUtils {
int pairNum = clickhouse.length / replica;
IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>()));
ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000")
- .setCluster(clusterNode);
+ .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode);
int i = 0;
for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) {
Node node = new Node();
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
index 0f4ebbfb83..9e5c578127 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java
@@ -178,6 +178,13 @@ public class ClickHouseStorage implements SecondStoragePlugin {
if (StringUtils.isNotEmpty(cluster.getSocketTimeout())) {
param.put(ClickHouse.SOCKET_TIMEOUT, cluster.getSocketTimeout());
}
+ if (StringUtils.isNotEmpty(cluster.getConnectTimeout())) {
+ int timeout = Integer.parseInt(cluster.getConnectTimeout()) / 1000;
+ param.put(ClickHouse.CONNECT_TIMEOUT, Integer.toString(timeout));
+ }
+ if (StringUtils.isNotEmpty(cluster.getExtConfig())) {
+ param.put(ClickHouse.EXT_CONFIG, cluster.getExtConfig());
+ }
if (StringUtils.isNotEmpty(node.getUser())) {
param.put(ClickHouse.USER, node.getUser());
}
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
index 7a0100c17b..9664bde2e7 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java
@@ -52,6 +52,8 @@ public class ClickHouse implements Closeable {
public static final String USER = "user";
public static final String SOCKET_TIMEOUT = "socket_timeout";
public static final String KEEP_ALIVE_TIMEOUT = "keepAliveTimeout";
+ public static final String CONNECT_TIMEOUT = "connect_timeout";
+ public static final String EXT_CONFIG = "extConfig";
public static final String CLIENT_NAME = "client_name";
private final String shardName;
@@ -110,8 +112,15 @@ public class ClickHouse implements Closeable {
if (!param.isEmpty()) {
base.append('?');
List<String> paramList = new ArrayList<>();
- param.forEach((name, value) -> paramList.add(name + "=" + value));
+ param.forEach((name, value) -> {
+ if (!ClickHouse.EXT_CONFIG.equals(name)){
+ paramList.add(name + "=" + value);
+ }
+ });
base.append(String.join("&", paramList));
+ if(param.get(ClickHouse.EXT_CONFIG) != null) {
+ base.append("&").append(param.get(ClickHouse.EXT_CONFIG));
+ }
}
return base.toString();
}
diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
index a9f2794065..06c4143328 100644
--- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
+++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java
@@ -66,10 +66,12 @@ public class ClickHouseConfigLoader implements SecondStorageConfigLoader {
clusterDesc.addPropertyParameters("cluster", String.class, List.class);
clusterDesc.addPropertyParameters("socketTimeout", String.class);
clusterDesc.addPropertyParameters("keepAliveTimeout", String.class);
+ clusterDesc.addPropertyParameters("connectTimeout", String.class);
clusterDesc.addPropertyParameters("installPath", String.class);
clusterDesc.addPropertyParameters("logPath", String.class);
clusterDesc.addPropertyParameters("userName", String.class);
clusterDesc.addPropertyParameters("password", String.class);
+ clusterDesc.addPropertyParameters("extConfig", String.class);
constructor.addTypeDescription(clusterDesc);
val nodeDesc = new TypeDescription(Node.class);
nodeDesc.addPropertyParameters("name", String.class);
diff --git a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
index 60191478a6..50f58b933a 100644
--- a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
+++ b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java
@@ -49,6 +49,7 @@ public class MockSecondStorage {
ClusterInfo cluster = new ClusterInfo();
cluster.setKeepAliveTimeout("600000");
cluster.setSocketTimeout("600000");
+ cluster.setConnectTimeout("3000");
cluster.setCluster(Collections.emptyMap());
File file = File.createTempFile("clickhouse", ".yaml");
ClickHouseConfigLoader.getConfigYaml().dump(JsonUtil.readValue(JsonUtil.writeValueAsString(cluster),
@@ -62,6 +63,7 @@ public class MockSecondStorage {
ClusterInfo cluster = new ClusterInfo();
cluster.setKeepAliveTimeout("600000");
cluster.setSocketTimeout("600000");
+ cluster.setConnectTimeout("3000");
Map<String, List<Node>> clusterNodes = new HashMap<>();
cluster.setCluster(clusterNodes);
val it = nodes.listIterator();
diff --git a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
index e57010a1f9..ab29f99695 100644
--- a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
+++ b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java
@@ -36,12 +36,14 @@ public class ClusterInfo {
private Map<String, List<Node>> cluster;
private String socketTimeout;
private String keepAliveTimeout;
+ private String connectTimeout;
private String installPath;
private String logPath;
//username of machine
private String userName;
private String password;
+ private String extConfig;
@JsonIgnore
public List<Node> getNodes() {
@@ -93,6 +95,24 @@ public class ClusterInfo {
return keepAliveTimeout;
}
+ public String getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public ClusterInfo setConnectTimeout(String connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ public String getExtConfig() {
+ return extConfig;
+ }
+
+ public ClusterInfo setExtConfig(String extConfig) {
+ this.extConfig = extConfig;
+ return this;
+ }
+
public ClusterInfo setKeepAliveTimeout(String keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
return this;
@@ -135,11 +155,13 @@ public class ClusterInfo {
public ClusterInfo(ClusterInfo cluster) {
this.cluster = Maps.newHashMap(cluster.getCluster());
this.keepAliveTimeout = cluster.getKeepAliveTimeout();
- this.socketTimeout = cluster.getKeepAliveTimeout();
+ this.socketTimeout = cluster.getSocketTimeout();
+ this.connectTimeout = cluster.getConnectTimeout();
this.logPath = cluster.getLogPath();
this.userName = cluster.getUserName();
this.password = cluster.getPassword();
this.installPath = cluster.getInstallPath();
+ this.extConfig = cluster.getExtConfig();
}
public boolean emptyCluster() {
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
index 326386b865..df13963a68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
@@ -17,10 +17,15 @@
*/
package org.apache.kylin.engine.spark;
-import com.google.common.base.Preconditions;
-import org.apache.kylin.engine.spark.job.NSparkMergingJob;
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.util.Shell;
@@ -28,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.job.NSparkMergingJob;
import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.ExecutableState;
@@ -60,14 +66,10 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.sparkproject.guava.collect.Sets;
-import java.io.File;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase implements Serializable {
@@ -129,7 +131,9 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
@AfterClass
public static void afterClass() {
- ss.close();
+ if (ss != null) {
+ ss.close();
+ }
FileUtils.deleteQuietly(new File("../kap-it/metastore_db"));
}
@@ -211,29 +215,29 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple
if (type.isIntegerFamily())
switch (type.getName()) {
- case "tinyint":
- return DataTypes.ByteType;
- case "smallint":
- return DataTypes.ShortType;
- case "integer":
- case "int4":
- return DataTypes.IntegerType;
- default:
- return DataTypes.LongType;
+ case "tinyint":
+ return DataTypes.ByteType;
+ case "smallint":
+ return DataTypes.ShortType;
+ case "integer":
+ case "int4":
+ return DataTypes.IntegerType;
+ default:
+ return DataTypes.LongType;
}
if (type.isNumberFamily())
switch (type.getName()) {
- case "float":
- return DataTypes.FloatType;
- case "double":
- return DataTypes.DoubleType;
- default:
- if (type.getPrecision() == -1 || type.getScale() == -1) {
- return DataTypes.createDecimalType(19, 4);
- } else {
- return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
- }
+ case "float":
+ return DataTypes.FloatType;
+ case "double":
+ return DataTypes.DoubleType;
+ default:
+ if (type.getPrecision() == -1 || type.getScale() == -1) {
+ return DataTypes.createDecimalType(19, 4);
+ } else {
+ return DataTypes.createDecimalType(type.getPrecision(), type.getScale());
+ }
}
if (type.isStringFamily())
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
index 185c9fc0ff..dca40c9f68 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java
@@ -37,8 +37,11 @@ import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Maps;
@@ -51,6 +54,15 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
private final String STORAGE_DFS_DIR = "/test";
private final String FILED_DELIMITER = "|";
+ @BeforeClass
+ public static void beforeClass() {
+ if (SparderEnv.isSparkAvailable()) {
+ SparderEnv.getSparkSession().close();
+ }
+ SparkSession.clearActiveSession();
+ SparkSession.clearDefaultSession();
+ }
+
@Before
public void setup() {
{
@@ -72,9 +84,8 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest {
SystemPropertiesCache.setProperty("kylin.source.provider.9",
"io.kyligence.kap.engine.spark.source.NSparkDataSource");
SystemPropertiesCache.setProperty("kylin.build.resource.read-transactional-table-enabled", "true");
- KylinBuildEnv kylinBuildEnv = KylinBuildEnv.getOrCreate(getTestConfig());
- NTableMetadataManager tableMgr = NTableMetadataManager
- .getInstance(getTestConfig(), "tdh");
+ KylinBuildEnv kylinBuildEnv = new KylinBuildEnv(getTestConfig());
+ NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), "tdh");
TableDesc fact = tableMgr.getTableDesc("TDH_TEST.LINEORDER_PARTITION");
fact.setTransactional(true);
String result = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(fact, Maps.newHashMap(), "LO_ORDERKEY", kylinBuildEnv);