You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/21 15:10:40 UTC

[incubator-seatunnel] branch dev updated: Fix config param issues of spark redis sink (#1524)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c0b8250  Fix config param issues of spark redis sink (#1524)
c0b8250 is described below

commit c0b825065cf8105155ee78d3e71c17b76ee1cb52
Author: DingPengfei <di...@qq.com>
AuthorDate: Mon Mar 21 23:10:36 2022 +0800

    Fix config param issues of spark redis sink (#1524)
---
 .../src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
index d67b846..ea47e98 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
@@ -22,7 +22,7 @@ import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
-import org.apache.seatunnel.spark.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME}
+import org.apache.seatunnel.spark.common.Constants.{AUTH, DATA_TYPE, DB_NUM, DEFAULT_AUTH, DEFAULT_DATA_TYPE, DEFAULT_DB_NUM, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT, HASH_NAME, HOST, LIST_NAME, PORT, SET_NAME, TIMEOUT, ZSET_NAME}
 import org.apache.seatunnel.spark.common.RedisDataType
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
@@ -38,7 +38,7 @@ class Redis extends SparkBatchSink with Logging {
     val redisConfigs = new RedisConfig(RedisEndpoint(
       host = config.getString(HOST),
       port = config.getInt(PORT),
-      auth = config.getString(AUTH),
+      auth = if (config.getIsNull(AUTH)) null else config.getString(AUTH),
       dbNum = config.getInt(DB_NUM),
       timeout = config.getInt(TIMEOUT)
     ))
@@ -57,9 +57,8 @@ class Redis extends SparkBatchSink with Logging {
 
   override def checkConfig(): CheckResult = {
     CheckConfigUtil.checkAllExists(config, HOST, PORT)
-
-    val dataType = config.getString(DATA_TYPE)
-    if (dataType != null) {
+    if (config.hasPath(DATA_TYPE)) {
+      val dataType = config.getString(DATA_TYPE)
       val dataTypeList = List("KV", "HASH", "SET", "ZSET", "LIST")
       val bool = dataTypeList.contains(dataType.toUpperCase)
       if (!bool) {
@@ -79,6 +78,7 @@ class Redis extends SparkBatchSink with Logging {
         PORT -> DEFAULT_PORT,
         AUTH -> DEFAULT_AUTH,
         DB_NUM -> DEFAULT_DB_NUM,
+        DATA_TYPE -> DEFAULT_DATA_TYPE,
         TIMEOUT -> DEFAULT_TIMEOUT
       ))
     config = config.withFallback(defaultConfig)