You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/21 15:10:40 UTC
[incubator-seatunnel] branch dev updated: Fix config param issues of spark redis sink (#1524)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c0b8250 Fix config param issues of spark redis sink (#1524)
c0b8250 is described below
commit c0b825065cf8105155ee78d3e71c17b76ee1cb52
Author: DingPengfei <di...@qq.com>
AuthorDate: Mon Mar 21 23:10:36 2022 +0800
Fix config param issues of spark redis sink (#1524)
---
.../src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
index d67b846..ea47e98 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
@@ -22,7 +22,7 @@ import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME}
+import org.apache.seatunnel.spark.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME}
import org.apache.seatunnel.spark.common.RedisDataType
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
@@ -38,7 +38,7 @@ class Redis extends SparkBatchSink with Logging {
val redisConfigs = new RedisConfig(RedisEndpoint(
host = config.getString(HOST),
port = config.getInt(PORT),
- auth = config.getString(AUTH),
+ auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
dbNum = config.getInt(DB_NUM),
timeout = config.getInt(TIMEOUT)
))
@@ -57,9 +57,8 @@ class Redis extends SparkBatchSink with Logging {
override def checkConfig(): CheckResult = {
CheckConfigUtil.checkAllExists(config, HOST, PORT)
-
- val dataType = config.getString(DATA_TYPE)
- if (dataType != null) {
+ if (config.hasPath(DATA_TYPE)) {
+ val dataType = config.getString(DATA_TYPE)
val dataTypeList = List("KV", "HASH", "SET", "ZSET", "LIST")
val bool = dataTypeList.contains(dataType.toUpperCase)
if (!bool) {
@@ -79,6 +78,7 @@ class Redis extends SparkBatchSink with Logging {
PORT -> DEFAULT_PORT,
AUTH -> DEFAULT_AUTH,
DB_NUM -> DEFAULT_DB_NUM,
+ DATA_TYPE -> DEFAULT_DATA_TYPE,
TIMEOUT -> DEFAULT_TIMEOUT
))
config = config.withFallback(defaultConfig)