You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/01/21 18:44:52 UTC

[GitHub] [incubator-seatunnel] wuchunfu opened a new pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

wuchunfu opened a new pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136


   …em and code optimization
   
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[SeaTunnel #XXXX] [component] Title of the pull request", where *SeaTunnel #XXXX* should be replaced by the actual issue number.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   close #1135
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in you PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/developement/NewLicenseGuide.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] RickyHuo commented on pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

Posted by GitBox <gi...@apache.org>.
RickyHuo commented on pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136#issuecomment-1019078413


   PTAL @an-shi-chi-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136#discussion_r790596361



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -16,113 +16,111 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import com.redislabs.provider.redis.toRedisContext
-import org.apache.seatunnel.common.config.CheckResult
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, Row}
 
-import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 class Redis extends SparkBatchSink with Logging {
 
   val redisConfig: mutable.Map[String, String] = mutable.Map()
   val redisPrefix = "redis"
   var redisSaveType: RedisSaveType.Value = _
+  val redisHost = "redis_host"
+  val redisPort = "redis_port"
+  val redisAuth = "redis_auth"
+  val redisDb = "redis_db"
+  val redisTimeout = "redis_timeout"
+  val REDIS_SAVE_TYPE = "redis_save_type"
   val HASH_NAME = "redis_hash_name"
   val SET_NAME = "redis_set_name"
   val ZSET_NAME = "redis_zset_name"
   val LIST_NAME = "redis_list_name"
-  val REDIS_SAVE_TYPE = "redis_save_type"
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    val redisConfigs = new RedisConfig(RedisEndpoint(
+      host = config.getString(redisHost),
+      port = config.getInt(redisPort),
+      auth = config.getString(redisAuth),
+      dbNum = config.getInt(redisDb),
+      timeout = config.getInt(redisTimeout))
+    )
+
     implicit val sc: SparkContext = env.getSparkSession.sparkContext
     redisSaveType match {
-      case RedisSaveType.KV => dealWithKV(data)
-      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))
-      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))
-      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))
-      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))
+      case RedisSaveType.KV => dealWithKV(data)(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))(sc = sc, redisConfig = redisConfigs)
     }
   }
 
   override def checkConfig(): CheckResult = {
-    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
-      if (entry.getKey.equals("redis_save_type")) {
-        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
-      }
-      redisConfig.put(entry.getKey, config.getString(entry.getKey))
-    })
-
-    def checkParam(checkArr: Array[String]): CheckResult = {
-      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
-      if (notExistConfig.isEmpty) {
-        CheckResult.success()
-      } else {
-        CheckResult.error(s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
-      }
-    }
+    CheckConfigUtil.checkAllExists(config, "redis_host", "redis_port", "redis_save_type")
 
-    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
-    if (!result.isSuccess) {
-      result
+    val saveTypeList = List("KV", "HASH", "SET", "ZSET", "LIST")
+    val saveType = config.getString("redis_save_type")
+
+    val bool = saveTypeList.contains(saveType.toUpperCase)
+    if (bool) {
+      redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      CheckResult.success()
     } else {
-      redisSaveType match {
-        case RedisSaveType.KV => checkParam(Array())
-        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
-        case RedisSaveType.SET => checkParam(Array(SET_NAME))
-        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
-        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
-        case _ => CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
-      }
+      CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
     }
   }
 
   override def prepare(prepareEnv: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        redisHost -> "localhost",
+        redisPort -> 6379,
+        redisAuth -> null,
+        redisDb -> 0,
+        redisTimeout -> 2000)
+    )
+    config = config.withFallback(defaultConfig)
+  }
 
-    val conf = prepareEnv.getSparkSession.conf
-    conf.set("spark.redis.host", config.getString("redis_host"))
-    conf.set("spark.redis.port", config.getString("redis_port"))
-    if (config.hasPath("redis.auth")) {
-      conf.set("spark.redis.auth", "passwd")
+  def checkParam(checkArr: Array[String]): CheckResult = {
+    val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))

Review comment:
       > we can remove `checkParam()`
   
   ok i'll fix that later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] simon824 commented on a change in pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

Posted by GitBox <gi...@apache.org>.
simon824 commented on a change in pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136#discussion_r790578074



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -16,113 +16,111 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import com.redislabs.provider.redis.toRedisContext
-import org.apache.seatunnel.common.config.CheckResult
+import com.redislabs.provider.redis.{RedisConfig, RedisEndpoint, toRedisContext}
+import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult}
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, Row}
 
-import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 class Redis extends SparkBatchSink with Logging {
 
   val redisConfig: mutable.Map[String, String] = mutable.Map()
   val redisPrefix = "redis"
   var redisSaveType: RedisSaveType.Value = _
+  val redisHost = "redis_host"
+  val redisPort = "redis_port"
+  val redisAuth = "redis_auth"
+  val redisDb = "redis_db"
+  val redisTimeout = "redis_timeout"
+  val REDIS_SAVE_TYPE = "redis_save_type"
   val HASH_NAME = "redis_hash_name"
   val SET_NAME = "redis_set_name"
   val ZSET_NAME = "redis_zset_name"
   val LIST_NAME = "redis_list_name"
-  val REDIS_SAVE_TYPE = "redis_save_type"
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    val redisConfigs = new RedisConfig(RedisEndpoint(
+      host = config.getString(redisHost),
+      port = config.getInt(redisPort),
+      auth = config.getString(redisAuth),
+      dbNum = config.getInt(redisDb),
+      timeout = config.getInt(redisTimeout))
+    )
+
     implicit val sc: SparkContext = env.getSparkSession.sparkContext
     redisSaveType match {
-      case RedisSaveType.KV => dealWithKV(data)
-      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))
-      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))
-      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))
-      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))
+      case RedisSaveType.KV => dealWithKV(data)(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))(sc = sc, redisConfig = redisConfigs)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))(sc = sc, redisConfig = redisConfigs)
     }
   }
 
   override def checkConfig(): CheckResult = {
-    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
-      if (entry.getKey.equals("redis_save_type")) {
-        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
-      }
-      redisConfig.put(entry.getKey, config.getString(entry.getKey))
-    })
-
-    def checkParam(checkArr: Array[String]): CheckResult = {
-      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
-      if (notExistConfig.isEmpty) {
-        CheckResult.success()
-      } else {
-        CheckResult.error(s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
-      }
-    }
+    CheckConfigUtil.checkAllExists(config, "redis_host", "redis_port", "redis_save_type")
 
-    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
-    if (!result.isSuccess) {
-      result
+    val saveTypeList = List("KV", "HASH", "SET", "ZSET", "LIST")
+    val saveType = config.getString("redis_save_type")
+
+    val bool = saveTypeList.contains(saveType.toUpperCase)
+    if (bool) {
+      redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      CheckResult.success()
     } else {
-      redisSaveType match {
-        case RedisSaveType.KV => checkParam(Array())
-        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
-        case RedisSaveType.SET => checkParam(Array(SET_NAME))
-        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
-        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
-        case _ => CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
-      }
+      CheckResult.error("Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]")
     }
   }
 
   override def prepare(prepareEnv: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        redisHost -> "localhost",
+        redisPort -> 6379,
+        redisAuth -> null,
+        redisDb -> 0,
+        redisTimeout -> 2000)
+    )
+    config = config.withFallback(defaultConfig)
+  }
 
-    val conf = prepareEnv.getSparkSession.conf
-    conf.set("spark.redis.host", config.getString("redis_host"))
-    conf.set("spark.redis.port", config.getString("redis_port"))
-    if (config.hasPath("redis.auth")) {
-      conf.set("spark.redis.auth", "passwd")
+  def checkParam(checkArr: Array[String]): CheckResult = {
+    val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))

Review comment:
       we can remove `checkParam()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #1136: [Bug] [seatunnel-connector-spark-redis] Solve user unauthorized problem and code optimization

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on pull request #1136:
URL: https://github.com/apache/incubator-seatunnel/pull/1136#issuecomment-1020198823


   It's better to update your title(support Redis authentication)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org