You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2021/12/30 12:31:05 UTC

[GitHub] [incubator-seatunnel] LiuWenSheng opened a new pull request #907: [feature] [spark plugin] add redis sink #733

LiuWenSheng opened a new pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907


   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[SeaTunnel #XXXX] [component] Title of the pull request", where *SeaTunnel #XXXX* should be replaced by the actual issue number.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   add spark plugin sink of redis
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] Change does not need document change, or I will submit document change to https://github.com/apache/incubator-seatunnel-website later
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] an-shi-chi-fan commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
an-shi-chi-fan commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r779258066



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.sink
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: mutable.Map[String, String] = mutable.Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc: SparkContext = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))
+      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))
+      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config RedisSaveType must be in [KV HASH SET ZSET LIST]")

Review comment:
       @RickyHuo it was fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] an-shi-chi-fan commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
an-shi-chi-fan commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r778078013



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package  org.apache.seatunnel.spark.sink;
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable.Map
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: Map[String, String] = Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig.get(HASH_NAME).get)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig.get(SET_NAME).get)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig.get(ZSET_NAME).get)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig.get(LIST_NAME).get)
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config")

Review comment:
       it was fixed,please cc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] an-shi-chi-fan commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
an-shi-chi-fan commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r778078868



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package  org.apache.seatunnel.spark.sink;
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable.Map
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: Map[String, String] = Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig.get(HASH_NAME).get)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig.get(SET_NAME).get)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig.get(ZSET_NAME).get)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig.get(LIST_NAME).get)
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config")
+      }
+    }
+  }
+
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {
+
+    val conf = prepareEnv.getSparkSession.conf
+    conf.set("spark.redis.host", config.getString("redis_host"))
+    conf.set("spark.redis.port", config.getString("redis_port"))
+    if (config.hasPath("redis.auth")) {
+      conf.set("spark.redis.auth", "passwd")
+    }
+  }
+
+
+  def dealWithKV(data: Dataset[Row])(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisKV(value)
+  }
+
+  def dealWithList(data: Dataset[Row], listName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisLIST(value, listName)
+  }
+
+  def dealWithSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisSET(value, setName)
+  }
+
+  def dealWithZSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisZSET(value, setName)
+  }
+
+  def dealWithHASH(data: Dataset[Row], hashName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisHASH(value, hashName)
+  }
+}
+
+
+object RedisSaveType extends Enumeration {
+  def RedisSaveType = Value
+
+  val KV = Value("KV")
+  val HASH = Value("HASH")
+  val LIST = Value("LIST")
+  val SET = Value("SET")
+  val ZSET = Value("ZSET")
+}

Review comment:
       it was fixed,please cc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] an-shi-chi-fan commented on pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
an-shi-chi-fan commented on pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#issuecomment-1004795572


   @CalvinKirs @zixi0825 i have change the scala style please cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] an-shi-chi-fan commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
an-shi-chi-fan commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r779394574



##########
File path: docs/en/configuration/sink-plugins/Redis.md
##########
@@ -0,0 +1,57 @@
+# Sink plugin: Redis
+
+### Description
+
+Write Rows to a Redis.
+
+### Options
+
+| name | type | required | default value | engine |

Review comment:
       it was fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] RickyHuo commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
RickyHuo commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r777092813



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package  org.apache.seatunnel.spark.sink;
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable.Map
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: Map[String, String] = Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig.get(HASH_NAME).get)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig.get(SET_NAME).get)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig.get(ZSET_NAME).get)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig.get(LIST_NAME).get)
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config")

Review comment:
       This err message should be more clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r776723890



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package  org.apache.seatunnel.spark.sink;
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable.Map
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: Map[String, String] = Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig.get(HASH_NAME).get)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig.get(SET_NAME).get)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig.get(ZSET_NAME).get)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig.get(LIST_NAME).get)
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config")
+      }
+    }
+  }
+
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {
+
+    val conf = prepareEnv.getSparkSession.conf
+    conf.set("spark.redis.host", config.getString("redis_host"))
+    conf.set("spark.redis.port", config.getString("redis_port"))
+    if (config.hasPath("redis.auth")) {
+      conf.set("spark.redis.auth", "passwd")
+    }
+  }
+
+
+  def dealWithKV(data: Dataset[Row])(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisKV(value)
+  }
+
+  def dealWithList(data: Dataset[Row], listName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisLIST(value, listName)
+  }
+
+  def dealWithSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisSET(value, setName)
+  }
+
+  def dealWithZSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisZSET(value, setName)
+  }
+
+  def dealWithHASH(data: Dataset[Row], hashName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisHASH(value, hashName)
+  }
+}
+
+
+object RedisSaveType extends Enumeration {
+  def RedisSaveType = Value
+
+  val KV = Value("KV")
+  val HASH = Value("HASH")
+  val LIST = Value("LIST")
+  val SET = Value("SET")
+  val ZSET = Value("ZSET")
+}

Review comment:
       File must end with newline character




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r776723740



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package  org.apache.seatunnel.spark.sink;
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable.Map
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: Map[String, String] = Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig.get(HASH_NAME).get)
+      case RedisSaveType.SET => dealWithSet(data, redisConfig.get(SET_NAME).get)
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig.get(ZSET_NAME).get)
+      case RedisSaveType.LIST => dealWithList(data, redisConfig.get(LIST_NAME).get)
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config")
+      }
+    }
+  }
+
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {
+
+    val conf = prepareEnv.getSparkSession.conf
+    conf.set("spark.redis.host", config.getString("redis_host"))
+    conf.set("spark.redis.port", config.getString("redis_port"))
+    if (config.hasPath("redis.auth")) {
+      conf.set("spark.redis.auth", "passwd")
+    }
+  }
+
+
+  def dealWithKV(data: Dataset[Row])(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisKV(value)
+  }
+
+  def dealWithList(data: Dataset[Row], listName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisLIST(value, listName)
+  }
+
+  def dealWithSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0)))
+    sc.toRedisSET(value, setName)
+  }
+
+  def dealWithZSet(data: Dataset[Row], setName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisZSET(value, setName)
+  }
+
+  def dealWithHASH(data: Dataset[Row], hashName: String)(implicit sc: SparkContext): Unit = {
+    val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
+    sc.toRedisHASH(value, hashName)
+  }
+}
+
+
+object RedisSaveType extends Enumeration {
+  def RedisSaveType = Value

Review comment:
       Public method must have explicit type




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #907: [feature] [spark plugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#issuecomment-1003020945


    It seems that the scala code format detection failed, can you try to solve it locally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#issuecomment-1003039837


   Can we close #778 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] RickyHuo commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
RickyHuo commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r779268716



##########
File path: docs/en/configuration/sink-plugins/Redis.md
##########
@@ -0,0 +1,57 @@
+# Sink plugin: Redis
+
+### Description
+
+Write Rows to a Redis.
+
+### Options
+
+| name | type | required | default value | engine |
+| --- | --- | --- | --- | --- |
+| [redis_host](#redis-host-string) | string | yes | "localhost" | all streaming |

Review comment:
       ```
   | [redis_host](#redis_host-string) | string | yes | "localhost" | all streaming |
   ```
   You should write like this, otherwise the redirect would be invalid.

##########
File path: docs/en/configuration/sink-plugins/Redis.md
##########
@@ -0,0 +1,57 @@
+# Sink plugin: Redis
+
+### Description
+
+Write Rows to a Redis.
+
+### Options
+
+| name | type | required | default value | engine |

Review comment:
       Please remove column of `engine`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] RickyHuo merged pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
RickyHuo merged pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #907: [feature] [spark plugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#issuecomment-1003021464


   This is the check command `mvn -B package scalastyle:check`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] LiuWenSheng commented on pull request #907: [feature] [spark plugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
LiuWenSheng commented on pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#issuecomment-1003010341


   @CalvinKirs @zixi0825  cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] RickyHuo commented on a change in pull request #907: [Feature] [SparkPlugin] add redis sink #733

Posted by GitBox <gi...@apache.org>.
RickyHuo commented on a change in pull request #907:
URL: https://github.com/apache/incubator-seatunnel/pull/907#discussion_r778604560



##########
File path: seatunnel-connectors/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/sink/Redis.scala
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.sink
+
+import com.redislabs.provider.redis.toRedisContext
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.internal.Logging
+
+import scala.collection.JavaConverters.asScalaSetConverter
+import scala.collection.mutable
+
+class Redis extends SparkBatchSink with Logging {
+
+  val redisConfig: mutable.Map[String, String] = mutable.Map()
+  val redisPrefix = "redis"
+  var redisSaveType: RedisSaveType.Value = _
+  val HASH_NAME = "redis_hash_name"
+  val SET_NAME = "redis_set_name"
+  val ZSET_NAME = "redis_zset_name"
+  val LIST_NAME = "redis_list_name"
+  val REDIS_SAVE_TYPE = "redis_save_type"
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    implicit val sc: SparkContext = env.getSparkSession.sparkContext
+    redisSaveType match {
+      case RedisSaveType.KV => dealWithKV(data)
+      case RedisSaveType.HASH => dealWithHASH(data, redisConfig(HASH_NAME))
+      case RedisSaveType.SET => dealWithSet(data, redisConfig(SET_NAME))
+      case RedisSaveType.ZSET => dealWithZSet(data, redisConfig(ZSET_NAME))
+      case RedisSaveType.LIST => dealWithList(data, redisConfig(LIST_NAME))
+    }
+  }
+
+  override def checkConfig(): CheckResult = {
+    config.entrySet().asScala.filter(x => x.getKey.startsWith("redis")).foreach(entry => {
+      if (entry.getKey.equals("redis_save_type")) {
+        redisSaveType = RedisSaveType.withName(config.getString("redis_save_type"))
+      }
+      redisConfig.put(entry.getKey, config.getString(entry.getKey))
+    })
+
+    def checkParam(checkArr: Array[String]): CheckResult = {
+      val notExistConfig: Array[String] = checkArr.filter(checkItem => !config.hasPath(checkItem))
+      if (notExistConfig.isEmpty) {
+        new CheckResult(true, "redis config is enough")
+      } else {
+        new CheckResult(false, s"redis config is not enough please check config [${notExistConfig.mkString(",")}]")
+      }
+    }
+
+    val result = checkParam(Array("redis_save_type", "redis_host", "redis_port"))
+    if (!result.isSuccess) {
+      result
+    } else {
+      redisSaveType match {
+        case RedisSaveType.KV => checkParam(Array())
+        case RedisSaveType.HASH => checkParam(Array(HASH_NAME))
+        case RedisSaveType.SET => checkParam(Array(SET_NAME))
+        case RedisSaveType.ZSET => checkParam(Array(ZSET_NAME))
+        case RedisSaveType.LIST => checkParam(Array(LIST_NAME))
+        case _ => new CheckResult(false, "unknown redis config RedisSaveType must be in [KV HASH SET ZSET LIST]")

Review comment:
       Unknown redis config. redis_save_type must be in [KV HASH SET ZSET LIST]
   
   `RedisSaveType` would be misunderstood.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org