You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ri...@apache.org on 2022/01/23 08:06:29 UTC

[incubator-seatunnel] branch dev updated: [Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)

This is an automated email from the ASF dual-hosted git repository.

rickyhuo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b9f7703  [Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)
b9f7703 is described below

commit b9f7703d76a490414619e0af7d2bbc1cb115f37d
Author: wuchunfu <31...@qq.com>
AuthorDate: Sun Jan 23 16:06:22 2022 +0800

    [Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)
---
 .../spark/configuration/source-plugins/MongoDB.md  | 29 ++++++++++++++++------
 .../apache/seatunnel/spark/source/MongoDB.scala    | 24 ++++++------------
 2 files changed, 28 insertions(+), 25 deletions(-)

diff --git a/docs/en/spark/configuration/source-plugins/MongoDB.md b/docs/en/spark/configuration/source-plugins/MongoDB.md
index f2dc764..2d78d35 100644
--- a/docs/en/spark/configuration/source-plugins/MongoDB.md
+++ b/docs/en/spark/configuration/source-plugins/MongoDB.md
@@ -6,11 +6,14 @@ Read data from MongoDB.
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| readconfig.uri            | string | yes      | -             |
-| readconfig.database       | string | yes      | -         |
-| readconfig.collection     | string | yes      | -             |
+| name                  | type   | required | default value |
+|-----------------------| ------ |----------|---------------|
+| readconfig.uri        | string | yes      | -             |
+| readconfig.database   | string | yes      | -             |
+| readconfig.collection | string | yes      | -             |
+| readconfig.*          | string | no       | -             |
+| schema                | string | no       | -             |
+| common-options        | string | yes      | -             |
 
 ### readconfig.uri [string]
 
@@ -24,6 +27,14 @@ MongoDB database
 
 MongoDB collection
 
+### readconfig.* [string]
+
+More other parameters can be configured here, see [MongoDB Configuration](https://docs.mongodb.com/spark-connector/current/configuration/) for details, see the Input Configuration section. The way to specify parameters is to prefix the original parameter name `readconfig.` For example, the way to set `spark.mongodb.input.partitioner` is `readconfig.spark.mongodb.input.partitioner="MongoPaginateBySizePartitioner"` . If you do not specify these optional parameters, the default values of th [...]
+
+### schema [string]
+
+Because `MongoDB` does not have the concept of `schema`, when spark reads `MongoDB` , it will sample `MongoDB` data and infer the `schema` . In fact, this process will be slow and may be inaccurate. This parameter can be manually specified. Avoid these problems. `schema` is a `json` string, such as `{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\ ",\"city\":\"string\"}}`
+
 ### common options [string]
 
 Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) for details
@@ -32,9 +43,11 @@ Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) fo
 
 ```bash
 mongodb {
-    readconfig.uri = "mongodb://username:password@192.168.0.1:27017/test"
-    readconfig.database = "test"
-    readconfig.collection = "collection1"
+    readconfig.uri = "mongodb://username:password@127.0.0.1:27017/mypost"
+    readconfig.database = "mydatabase"
+    readconfig.collection = "mycollection"
+    readconfig.spark.mongodb.input.partitioner = "MongoPaginateBySizePartitioner"
+    schema="{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
     result_table_name = "mongodb_result_table"
 }
 ```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
index c177bcc..74d595e 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions._
 import com.alibaba.fastjson.JSON
 import com.mongodb.spark.MongoSpark
 import com.mongodb.spark.config.ReadConfig
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult, TypesafeConfigUtils}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
 import org.apache.seatunnel.spark.utils.SparkStructTypeUtil
@@ -47,13 +47,12 @@ class MongoDB extends SparkBatchSource {
         val value = String.valueOf(entry.getValue.unwrapped())
         map.put(key, value)
       })
-    config.hasPath("schema") match {
-      case true => {
-        val schemaJson = JSON.parseObject(config.getString("schema"))
-        schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
-      }
-      case false => {}
+
+    if (config.hasPath("schema")) {
+      val schemaJson = JSON.parseObject(config.getString("schema"))
+      schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
     }
+
     readConfig = ReadConfig(map)
   }
 
@@ -67,16 +66,7 @@ class MongoDB extends SparkBatchSource {
   }
 
   override def checkConfig(): CheckResult = {
-    TypesafeConfigUtils.hasSubConfig(config, confPrefix) match {
-      case true =>
-        val read = TypesafeConfigUtils.extractSubConfig(config, confPrefix, false)
-        read.hasPath("uri") && read.hasPath("database") && read.hasPath("collection") match {
-          case true => CheckResult.success()
-          case false => CheckResult.error(
-              "please specify [readconfig.uri] and [readconfig.database] and [readconfig.collection]")
-        }
-      case false => CheckResult.error("please specify [readconfig]")
-    }
+    CheckConfigUtil.checkAllExists(config, "readconfig.uri", "readconfig.database", "readconfig.collection")
   }
 
 }