You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/06 10:42:53 UTC
[incubator-seatunnel] branch dev updated: [Improvement][connector-spark-tidb] Refactored config parameters (#1983)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 14e4d242 [Improvement][connector-spark-tidb] Refactored config parameters (#1983)
14e4d242 is described below
commit 14e4d2425d45f08e2e59dbee96aefd2f708b514c
Author: mans2singh <ma...@users.noreply.github.com>
AuthorDate: Mon Jun 6 06:42:50 2022 -0400
[Improvement][connector-spark-tidb] Refactored config parameters (#1983)
---
.../spark/tidb/{source/Tidb.scala => Config.scala} | 52 +++++++++++++++-------
.../apache/seatunnel/spark/tidb/sink/Tidb.scala | 15 ++++---
.../apache/seatunnel/spark/tidb/source/Tidb.scala | 7 +--
3 files changed, 47 insertions(+), 27 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
similarity index 51%
copy from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
copy to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
index 99a6b684..4c6cb9f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/Config.scala
@@ -14,28 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.spark.tidb
-package org.apache.seatunnel.spark.tidb.source
+/**
+ * Configuration parameters for TiDB source and sink
+ */
+object Config extends Serializable {
+
+ /**
+ * Address config parameter
+ */
+ val ADDR = "addr"
+
+ /**
+ * Port config parameter
+ */
+ val PORT = "port"
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSource
-import org.apache.spark.sql.{Dataset, Row}
+ /**
+ * User config parameter
+ */
+ val USER = "user"
-class Tidb extends SparkBatchSource {
+ /**
+ * Password config parameter
+ */
+ val PASSWORD = "password"
- override def prepare(env: SparkEnvironment): Unit = {}
+ /**
+ * Database config parameter
+ */
+ val DATABASE = "database"
- override def checkConfig(): CheckResult = {
- checkAllExists(config, "pre_sql", "database")
- }
+ /**
+ * Table config parameter
+ */
+ val TABLE = "table"
- override def getData(env: SparkEnvironment): Dataset[Row] = {
- val spark = env.getSparkSession
- spark.sql("use " + config.getString("database"))
- spark.sql(config.getString("pre_sql"))
- }
+ /**
+ * Pre sql config parameter
+ */
+ val PRE_SQL = "pre_sql"
- override def getPluginName: String = "TiDB"
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
index 64bfed39..88706d81 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.tidb.Config.{ADDR, PASSWORD, PORT, USER, DATABASE, TABLE}
import org.apache.spark.sql.{Dataset, Row}
import scala.collection.JavaConversions._
@@ -33,12 +34,12 @@ class Tidb extends SparkBatchSink {
val writer = data.write
.format("tidb")
.mode("append")
- .option("tidb.addr", config.getString("addr"))
- .option("tidb.password", config.getString("password"))
- .option("tidb.port", config.getString("port"))
- .option("tidb.user", config.getString("user"))
- .option("database", config.getString("database"))
- .option("table", config.getString("table"))
+ .option("tidb.addr", config.getString(ADDR))
+ .option("tidb.password", config.getString(PASSWORD))
+ .option("tidb.port", config.getString(PORT))
+ .option("tidb.user", config.getString(USER))
+ .option("database", config.getString(DATABASE))
+ .option("table", config.getString(TABLE))
Try(TypesafeConfigUtils.extractSubConfigThrowable(config, "options.", false)) match {
@@ -57,7 +58,7 @@ class Tidb extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "addr", "port", "database", "table", "user", "password")
+ checkAllExists(config, ADDR, PORT, DATABASE, TABLE, USER, PASSWORD)
}
override def getPluginName: String = "TiDB"
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
index 99a6b684..be95dab3 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.tidb.Config.{DATABASE, PRE_SQL}
import org.apache.spark.sql.{Dataset, Row}
class Tidb extends SparkBatchSource {
@@ -28,13 +29,13 @@ class Tidb extends SparkBatchSource {
override def prepare(env: SparkEnvironment): Unit = {}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "pre_sql", "database")
+ checkAllExists(config, PRE_SQL, DATABASE)
}
override def getData(env: SparkEnvironment): Dataset[Row] = {
val spark = env.getSparkSession
- spark.sql("use " + config.getString("database"))
- spark.sql(config.getString("pre_sql"))
+ spark.sql("use " + config.getString(DATABASE))
+ spark.sql(config.getString(PRE_SQL))
}
override def getPluginName: String = "TiDB"