You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ri...@apache.org on 2022/01/23 08:06:29 UTC
[incubator-seatunnel] branch dev updated: [Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)
This is an automated email from the ASF dual-hosted git repository.
rickyhuo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b9f7703 [Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)
b9f7703 is described below
commit b9f7703d76a490414619e0af7d2bbc1cb115f37d
Author: wuchunfu <31...@qq.com>
AuthorDate: Sun Jan 23 16:06:22 2022 +0800
[Improve][seatunnel-connector-spark-mongodb] Optimize the mongodb source connector (#1133)
---
.../spark/configuration/source-plugins/MongoDB.md | 29 ++++++++++++++++------
.../apache/seatunnel/spark/source/MongoDB.scala | 24 ++++++------------
2 files changed, 28 insertions(+), 25 deletions(-)
diff --git a/docs/en/spark/configuration/source-plugins/MongoDB.md b/docs/en/spark/configuration/source-plugins/MongoDB.md
index f2dc764..2d78d35 100644
--- a/docs/en/spark/configuration/source-plugins/MongoDB.md
+++ b/docs/en/spark/configuration/source-plugins/MongoDB.md
@@ -6,11 +6,14 @@ Read data from MongoDB.
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| readconfig.uri | string | yes | - |
-| readconfig.database | string | yes | - |
-| readconfig.collection | string | yes | - |
+| name | type | required | default value |
+|-----------------------| ------ |----------|---------------|
+| readconfig.uri | string | yes | - |
+| readconfig.database | string | yes | - |
+| readconfig.collection | string | yes | - |
+| readconfig.* | string | no | - |
+| schema | string | no | - |
+| common-options | string | yes | - |
### readconfig.uri [string]
@@ -24,6 +27,14 @@ MongoDB database
MongoDB collection
+### readconfig.* [string]
+
+More other parameters can be configured here, see [MongoDB Configuration](https://docs.mongodb.com/spark-connector/current/configuration/) for details, see the Input Configuration section. The way to specify parameters is to prefix the original parameter name `readconfig.` For example, the way to set `spark.mongodb.input.partitioner` is `readconfig.spark.mongodb.input.partitioner="MongoPaginateBySizePartitioner"` . If you do not specify these optional parameters, the default values of th [...]
+
+### schema [string]
+
+Because `MongoDB` does not have the concept of `schema`, when spark reads `MongoDB` , it will sample `MongoDB` data and infer the `schema` . In fact, this process will be slow and may be inaccurate. This parameter can be manually specified. Avoid these problems. `schema` is a `json` string, such as `{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\ ",\"city\":\"string\"}}`
+
### common options [string]
Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) for details
@@ -32,9 +43,11 @@ Source Plugin common parameters, refer to [Source Plugin](./source-plugin.md) fo
```bash
mongodb {
- readconfig.uri = "mongodb://username:password@192.168.0.1:27017/test"
- readconfig.database = "test"
- readconfig.collection = "collection1"
+ readconfig.uri = "mongodb://username:password@127.0.0.1:27017/mypost"
+ readconfig.database = "mydatabase"
+ readconfig.collection = "mycollection"
+ readconfig.spark.mongodb.input.partitioner = "MongoPaginateBySizePartitioner"
+ schema="{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
result_table_name = "mongodb_result_table"
}
```
diff --git a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
index c177bcc..74d595e 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/source/MongoDB.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions._
import com.alibaba.fastjson.JSON
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult, TypesafeConfigUtils}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.seatunnel.spark.utils.SparkStructTypeUtil
@@ -47,13 +47,12 @@ class MongoDB extends SparkBatchSource {
val value = String.valueOf(entry.getValue.unwrapped())
map.put(key, value)
})
- config.hasPath("schema") match {
- case true => {
- val schemaJson = JSON.parseObject(config.getString("schema"))
- schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
- }
- case false => {}
+
+ if (config.hasPath("schema")) {
+ val schemaJson = JSON.parseObject(config.getString("schema"))
+ schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
}
+
readConfig = ReadConfig(map)
}
@@ -67,16 +66,7 @@ class MongoDB extends SparkBatchSource {
}
override def checkConfig(): CheckResult = {
- TypesafeConfigUtils.hasSubConfig(config, confPrefix) match {
- case true =>
- val read = TypesafeConfigUtils.extractSubConfig(config, confPrefix, false)
- read.hasPath("uri") && read.hasPath("database") && read.hasPath("collection") match {
- case true => CheckResult.success()
- case false => CheckResult.error(
- "please specify [readconfig.uri] and [readconfig.database] and [readconfig.collection]")
- }
- case false => CheckResult.error("please specify [readconfig]")
- }
+ CheckConfigUtil.checkAllExists(config, "readconfig.uri", "readconfig.database", "readconfig.collection")
}
}