You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/06 10:42:53 UTC

[incubator-seatunnel] branch dev updated: [Improvement][connector-spark-tidb] Refactored config parameters (#1983)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 14e4d242 [Improvement][connector-spark-tidb] Refactored config parameters (#1983)
14e4d242 is described below

commit 14e4d2425d45f08e2e59dbee96aefd2f708b514c
Author: mans2singh <ma...@users.noreply.github.com>
AuthorDate: Mon Jun 6 06:42:50 2022 -0400

    [Improvement][connector-spark-tidb] Refactored config parameters (#1983)
---
 .../spark/tidb/{source/Tidb.scala => Config.scala} | 52 +++++++++++++++-------
 .../apache/seatunnel/spark/tidb/sink/Tidb.scala    | 15 ++++---
 .../apache/seatunnel/spark/tidb/source/Tidb.scala  |  7 +--
 3 files changed, 47 insertions(+), 27 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
similarity index 51%
copy from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
copy to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
index 99a6b684..4c6cb9f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
@@ -14,28 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.seatunnel.spark.tidb
 
-package org.apache.seatunnel.spark.tidb.source
+/**
+ * Configuration parameters for TiDB source and sink
+ */
+object Config extends Serializable {
+
+  /**
+   * Address config parameter
+   */
+  val ADDR = "addr"
+
+  /**
+   * Port config parameter
+   */
+  val PORT = "port"
 
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.spark.sql.{Dataset, Row}
+  /**
+   * User config parameter
+   */
+  val USER = "user"
 
-class Tidb extends SparkBatchSource {
+  /**
+   * Password config parameter
+   */
+  val PASSWORD = "password"
 
-  override def prepare(env: SparkEnvironment): Unit = {}
+  /**
+   * Database config parameter
+   */
+  val DATABASE = "database"
 
-  override def checkConfig(): CheckResult = {
-    checkAllExists(config, "pre_sql", "database")
-  }
+  /**
+   * Table config parameter
+   */
+  val TABLE = "table"
 
-  override def getData(env: SparkEnvironment): Dataset[Row] = {
-    val spark = env.getSparkSession
-    spark.sql("use " + config.getString("database"))
-    spark.sql(config.getString("pre_sql"))
-  }
+  /**
+   * Pre sql config parameter
+   */
+  val PRE_SQL = "pre_sql"
 
-  override def getPluginName: String = "TiDB"
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
index 64bfed39..88706d81 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.tidb.Config.{ADDR, PASSWORD, PORT, USER, DATABASE, TABLE}
 import org.apache.spark.sql.{Dataset, Row}
 
 import scala.collection.JavaConversions._
@@ -33,12 +34,12 @@ class Tidb extends SparkBatchSink {
     val writer = data.write
       .format("tidb")
       .mode("append")
-      .option("tidb.addr", config.getString("addr"))
-      .option("tidb.password", config.getString("password"))
-      .option("tidb.port", config.getString("port"))
-      .option("tidb.user", config.getString("user"))
-      .option("database", config.getString("database"))
-      .option("table", config.getString("table"))
+      .option("tidb.addr", config.getString(ADDR))
+      .option("tidb.password", config.getString(PASSWORD))
+      .option("tidb.port", config.getString(PORT))
+      .option("tidb.user", config.getString(USER))
+      .option("database", config.getString(DATABASE))
+      .option("table", config.getString(TABLE))
 
     Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
 
@@ -57,7 +58,7 @@ class Tidb extends SparkBatchSink {
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "addr", "port", "database", "table", "user", "password")
+    checkAllExists(config, ADDR, PORT, DATABASE, TABLE, USER, PASSWORD)
   }
 
   override def getPluginName: String = "TiDB"
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
index 99a6b684..be95dab3 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.tidb.Config.{DATABASE, PRE_SQL}
 import org.apache.spark.sql.{Dataset, Row}
 
 class Tidb extends SparkBatchSource {
@@ -28,13 +29,13 @@ class Tidb extends SparkBatchSource {
   override def prepare(env: SparkEnvironment): Unit = {}
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "pre_sql", "database")
+    checkAllExists(config, PRE_SQL, DATABASE)
   }
 
   override def getData(env: SparkEnvironment): Dataset[Row] = {
     val spark = env.getSparkSession
-    spark.sql("use " + config.getString("database"))
-    spark.sql(config.getString("pre_sql"))
+    spark.sql("use " + config.getString(DATABASE))
+    spark.sql(config.getString(PRE_SQL))
   }
 
   override def getPluginName: String = "TiDB"