You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/07 08:39:24 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector] clickhouse configuration extraction (#1671)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b50de38f [Improve][Connector] clickhouse configuration extraction (#1671)
b50de38f is described below
commit b50de38fcb6478349a69f7c62af891fc38368d93
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Thu Apr 7 16:39:18 2022 +0800
[Improve][Connector] clickhouse configuration extraction (#1671)
---
.../apache/seatunnel/spark/clickhouse/Config.scala | 109 +++++++++++++++++++++
.../spark/clickhouse/sink/Clickhouse.scala | 43 ++++----
.../spark/clickhouse/sink/ClickhouseFile.scala | 47 ++++-----
3 files changed, 155 insertions(+), 44 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala
new file mode 100644
index 00000000..75eb715f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.clickhouse
+
+/**
+ * Configurations for Clickhouse sink
+ */
+object Config extends Serializable {
+
+ /**
+ * Bulk size of clickhouse jdbc
+ */
+ val BULK_SIZE = "bulk_size"
+
+ /**
+ * Clickhouse jdbc retry time
+ */
+ val RETRY = "retry"
+
+ /**
+ * Clickhouse fields
+ */
+ val FIELDS = "fields"
+
+ /**
+ * Clickhouse server host
+ */
+ val HOST = "host"
+
+ /**
+ * Clickhouse table name
+ */
+ val TABLE = "table"
+
+ /**
+ * Clickhouse database name
+ */
+ val DATABASE = "database"
+
+ /**
+ * Clickhouse server username
+ */
+ val USERNAME = "username"
+
+ /**
+ * Clickhouse server password
+ */
+ val PASSWORD = "password"
+
+ /**
+ * Split mode when table is distributed engine
+ */
+ val SPLIT_MODE = "split_mode"
+
+ /**
+ * When split_mode is true, the sharding_key use for split
+ */
+ val SHARDING_KEY = "sharding_key"
+
+ /**
+ * The retry code when use clickhouse jdbc
+ */
+ val RETRY_CODES = "retry_codes"
+
+ /**
+ * ClickhouseFile sink connector used clickhouse-local program's path
+ */
+ val CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path"
+
+ /**
+ * The method of copy Clickhouse file
+ */
+ val COPY_METHOD = "copy_method"
+
+ /**
+ * The size of each batch read temporary data into local file.
+ */
+ val TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line"
+
+ /**
+ * Clickhouse server node is free-password.
+ */
+ val NODE_FREE_PASSWORD = "node_free_password"
+
+ /**
+ * The password of Clickhouse server node
+ */
+ val NODE_PASS = "node_pass"
+
+ /**
+ * The address of Clickhouse server node
+ */
+ val NODE_ADDRESS = "node_address"
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index f62714f2..bb7dd890 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -34,6 +34,7 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.clickhouse.Config.{BULK_SIZE, DATABASE, FIELDS, HOST, PASSWORD, RETRY, RETRY_CODES, SHARDING_KEY, SPLIT_MODE, TABLE, USERNAME}
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard}
import org.apache.spark.sql.{Dataset, Row}
import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
@@ -69,10 +70,10 @@ class Clickhouse extends SparkBatchSink {
override def output(data: Dataset[Row], environment: SparkEnvironment): Unit = {
val dfFields = data.schema.fieldNames
- val bulkSize = config.getInt("bulk_size")
- val retry = config.getInt("retry")
+ val bulkSize = config.getInt(BULK_SIZE)
+ val retry = config.getInt(RETRY)
- if (!config.hasPath("fields")) {
+ if (!config.hasPath(FIELDS)) {
fields = dfFields.toList
initSQL = initPrepareSQL()
}
@@ -109,7 +110,7 @@ class Clickhouse extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- var checkResult = checkAllExists(config, "host", "table", "database", "username", "password")
+ var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD)
if (checkResult.isSuccess) {
if (hasSubConfig(config, clickhousePrefix)) {
extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e => {
@@ -117,20 +118,20 @@ class Clickhouse extends SparkBatchSink {
})
}
- properties.put("user", config.getString("username"))
- properties.put("password", config.getString("password"))
+ properties.put("user", config.getString(USERNAME))
+ properties.put("password", config.getString(PASSWORD))
- if (config.hasPath("split_mode")) {
- splitMode = config.getBoolean("split_mode")
+ if (config.hasPath(SPLIT_MODE)) {
+ splitMode = config.getBoolean(SPLIT_MODE)
}
- val database = config.getString("database")
- val host = config.getString("host")
+ val database = config.getString(DATABASE)
+ val host = config.getString(HOST)
val conn = getClickhouseConnection(host, database, properties)
val hostAndPort = host.split(":")
- this.table = config.getString("table")
+ this.table = config.getString(TABLE)
this.tableSchema = getClickHouseSchema(conn, this.table).toMap
if (splitMode) {
- val tableName = config.getString("table")
+ val tableName = config.getString(TABLE)
val localTable = getClickHouseDistributedTable(conn, database, tableName)
if (Objects.isNull(localTable)) {
CheckResult.error(s"split mode only support table which engine is '$distributedEngine' at now")
@@ -143,8 +144,8 @@ class Clickhouse extends SparkBatchSink {
weight += elem.shardWeight
}
this.shardWeightCount = weight
- if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
- this.shardKey = config.getString("sharding_key")
+ if (config.hasPath(SHARDING_KEY) && StringUtils.isNotEmpty(config.getString(SHARDING_KEY))) {
+ this.shardKey = config.getString(SHARDING_KEY)
}
}
} else {
@@ -160,8 +161,8 @@ class Clickhouse extends SparkBatchSink {
this.shardKeyType = this.tableSchema(this.shardKey)
}
}
- if (this.config.hasPath("fields")) {
- this.fields = config.getStringList("fields")
+ if (this.config.hasPath(FIELDS)) {
+ this.fields = config.getStringList(FIELDS)
checkResult = acceptedClickHouseSchema(this.fields.toList, this.tableSchema, this.table)
}
}
@@ -169,18 +170,18 @@ class Clickhouse extends SparkBatchSink {
}
override def prepare(env: SparkEnvironment): Unit = {
- if (config.hasPath("fields")) {
+ if (config.hasPath(FIELDS)) {
this.initSQL = initPrepareSQL()
}
val defaultConfig = ConfigFactory.parseMap(
Map(
- "bulk_size" -> 20000,
+ BULK_SIZE -> 20000,
// "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),
- "retry_codes" -> util.Arrays.asList(),
- "retry" -> 1))
+ RETRY_CODES -> util.Arrays.asList(),
+ RETRY -> 1))
config = config.withFallback(defaultConfig)
- retryCodes = config.getIntList("retry_codes")
+ retryCodes = config.getIntList(RETRY_CODES)
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 676fb5d0..14e06327 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
import org.apache.seatunnel.spark.clickhouse.sink.Table
@@ -66,7 +67,7 @@ class ClickhouseFile extends SparkBatchSink {
override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
- if (!config.hasPath("fields")) {
+ if (!config.hasPath(FIELDS)) {
this.fields = data.schema.fieldNames.toList
}
@@ -208,23 +209,23 @@ class ClickhouseFile extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- var checkResult = checkAllExists(config, "host", "table", "database", "username", "password",
- "clickhouse_local_path")
+ var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD,
+ CLICKHOUSE_LOCAL_PATH)
if (checkResult.isSuccess) {
- clickhouseLocalPath = config.getString("clickhouse_local_path")
- properties.put("user", config.getString("username"))
- properties.put("password", config.getString("password"))
- val host = config.getString("host")
- val database = config.getString("database")
- val table = config.getString("table")
+ clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH)
+ properties.put("user", config.getString(USERNAME))
+ properties.put("password", config.getString(PASSWORD))
+ val host = config.getString(HOST)
+ val database = config.getString(DATABASE)
+ val table = config.getString(TABLE)
val conn = getClickhouseConnection(host, database, properties)
- if (config.hasPath("copy_method")) {
- this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
+ if (config.hasPath(COPY_METHOD)) {
+ this.copyFileMethod = getCopyMethod(config.getString(COPY_METHOD))
}
- if (config.hasPath("tmp_batch_cache_line")) {
- this.tmpBatchCacheLine = config.getInt("tmp_batch_cache_line")
+ if (config.hasPath(TMP_BATCH_CACHE_LINE)) {
+ this.tmpBatchCacheLine = config.getInt(TMP_BATCH_CACHE_LINE)
}
val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
@@ -233,16 +234,16 @@ class ClickhouseFile extends SparkBatchSink {
} else {
this.table = tableInfo
tableInfo.initTableInfo(host, conn)
- tableInfo.initShardDataPath(config.getString("username"), config.getString("password"))
+ tableInfo.initShardDataPath(config.getString(USERNAME), config.getString(PASSWORD))
// check config of node password whether completed or not
- if (config.hasPath("node_free_password") && config.getBoolean("node_free_password")) {
+ if (config.hasPath(NODE_FREE_PASSWORD) && config.getBoolean(NODE_FREE_PASSWORD)) {
this.freePass = true
- } else if (config.hasPath("node_pass")) {
- val nodePass = config.getObjectList("node_pass")
+ } else if (config.hasPath(NODE_PASS)) {
+ val nodePass = config.getObjectList(NODE_PASS)
val nodePassMap = mutable.Map[String, String]()
nodePass.foreach(np => {
- val address = np.toConfig.getString("node_address")
- val password = np.toConfig.getString("password")
+ val address = np.toConfig.getString(NODE_ADDRESS)
+ val password = np.toConfig.getString(PASSWORD)
nodePassMap(address) = password
})
this.nodePass = nodePassMap.toMap
@@ -254,13 +255,13 @@ class ClickhouseFile extends SparkBatchSink {
}
if (checkResult.isSuccess) {
// check sharding method
- if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
- this.table.shardKey = config.getString("sharding_key")
+ if (config.hasPath(SHARDING_KEY) && StringUtils.isNotEmpty(config.getString(SHARDING_KEY))) {
+ this.table.shardKey = config.getString(SHARDING_KEY)
}
checkResult = this.table.prepareShardInfo(conn)
if (checkResult.isSuccess) {
- if (this.config.hasPath("fields")) {
- this.fields = config.getStringList("fields").toList
+ if (this.config.hasPath(FIELDS)) {
+ this.fields = config.getStringList(FIELDS).toList
checkResult = acceptedClickHouseSchema(this.fields, JavaConversions.mapAsScalaMap(this.table
.tableSchema).toMap, this.table.name)
}