You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/01/24 09:58:59 UTC

[GitHub] [incubator-seatunnel] simon824 commented on a change in pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

simon824 commented on a change in pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136#discussion_r790578074



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -16,113 +16,111 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import com.redislabs.provider.redis.toRedisContext
-import org.apache.seatunnel.common.config.CheckResult
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, Row}
 
-import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 class Redis extends SparkBatchSink with Logging {
 
   val redisConfig: mutable.Map[String, String] = mutable.Map()
   val redisPrefix = "redis"
   var redisSaveType: RedisSaveType.Value = _
+  val redisHost = "redis_host"
+  val redisPort = "redis_port"
+  val redisAuth = "redis_auth"
+  val redisDb = "redis_db"
+  val redisTimeout = "redis_timeout"
+  val REDIS_SAVE_TYPE = "redis_save_type"
   val HASH_NAME = "redis_hash_name"
   val SET_NAME = "redis_set_name"
   val ZSET_NAME = "redis_zset_name"
   val LIST_NAME = "redis_list_name"
-  val REDIS_SAVE_TYPE = "redis_save_type"
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    val redisConfigs = new RedisConfig(RedisEndpoint(
+      host = config.getString(redisHost),
+      port = config.getInt(redisPort),
+      auth = config.getString(redisAuth),
+      dbNum = config.getInt(redisDb),
+      timeout = config.getInt(redisTimeout))
+    )
+
     implicit val sc: SparkContext = env.getSparkSession.sparkContext
     redisSaveType match {
-      case RedisSaveType.KV => dealWithKV(data)
-      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))
-      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))
-      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))
-      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))
+      case RedisSaveType.KV => dealWithKV(data)(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))(sc = sc, redisConfig = redisConfigs)
     }
   }
 
   override def checkConfig(): CheckResult = {
-    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
-      if (entry.getKey.equals("redis_save_type")) {
-        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
-      }
-      redisConfig.put(entry.getKey, config.getString(entry.getKey))
-    })
-
-    def checkParam(checkArr: Array[String]): CheckResult = {
-      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
-      if (notExistConfig.isEmpty) {
-        CheckResult.success()
-      } else {
-        CheckResult.error(s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
-      }
-    }
+    CheckConfigUtil.checkAllExists(config, "redis_host", "redis_port", "redis_save_type")
 
-    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
-    if (!result.isSuccess) {
-      result
+    val saveTypeList = List("KV", "HASH", "SET", "ZSET", "LIST")
+    val saveType = config.getString("redis_save_type")
+
+    val bool = saveTypeList.contains(saveType.toUpperCase)
+    if (bool) {
+      redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      CheckResult.success()
     } else {
-      redisSaveType match {
-        case RedisSaveType.KV => checkParam(Array())
-        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
-        case RedisSaveType.SET => checkParam(Array(SET_NAME))
-        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
-        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
-        case _ => CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
-      }
+      CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
     }
   }
 
   override def prepare(prepareEnv: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        redisHost -> "localhost",
+        redisPort -> 6379,
+        redisAuth -> null,
+        redisDb -> 0,
+        redisTimeout -> 2000)
+    )
+    config = config.withFallback(defaultConfig)
+  }
 
-    val conf = prepareEnv.getSparkSession.conf
-    conf.set("spark.redis.host", config.getString("redis_host"))
-    conf.set("spark.redis.port", config.getString("redis_port"))
-    if (config.hasPath("redis.auth")) {
-      conf.set("spark.redis.auth", "passwd")
+  def checkParam(checkArr: Array[String]): CheckResult = {
+    val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))

Review comment:
       we can remove `checkParam()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org