You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/23 14:13:08 UTC

[linkis] branch dev-1.4.0 updated: spark etl support mongo and es (#4560)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 42d070df6 spark etl support mongo and es (#4560)
42d070df6 is described below

commit 42d070df60cdd81a0af5b13ecb165f284d6d0385
Author: ChengJie1053 <18...@163.com>
AuthorDate: Tue May 23 22:13:00 2023 +0800

    spark etl support mongo and es (#4560)
---
 linkis-engineconn-plugins/spark/scala-2.12/pom.xml |  22 ++++
 .../spark/datacalc/TestElasticsearchCala.scala     | 122 +++++++++++++++++++++
 .../spark/datacalc/TestMongoCala.scala             | 120 ++++++++++++++++++++
 .../datacalc/sink/ElasticsearchSinkConfig.java     | 101 +++++++++++++++++
 .../spark/datacalc/sink/MongoSinkConfig.java       |  71 ++++++++++++
 .../datacalc/source/ElasticsearchSourceConfig.java |  95 ++++++++++++++++
 .../spark/datacalc/source/MongoSourceConfig.java   |  55 ++++++++++
 .../spark/datacalc/util/PluginUtil.java            |   4 +
 .../spark/datacalc/sink/ElasticsearchSink.scala    |  54 +++++++++
 .../spark/datacalc/sink/MongoSink.scala            |  52 +++++++++
 .../datacalc/source/ElasticsearchSource.scala      |  48 ++++++++
 .../spark/datacalc/source/MongoSource.scala        |  45 ++++++++
 .../spark/datacalc/TestRocketmqCala.scala          |  16 +--
 13 files changed, 795 insertions(+), 10 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
index aa8f5b75a..a4485b00d 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
+++ b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
@@ -116,6 +116,28 @@
       <artifactId>hudi-spark3.2-bundle_${scala.binary.version}</artifactId>
       <version>${hudi.version}</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.mongodb.spark</groupId>
+      <artifactId>mongo-spark-connector_${scala.binary.version}</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch-spark-30_${scala.binary.version}</artifactId>
+      <version>7.17.7</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-catalyst_2.12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
new file mode 100644
index 000000000..0ea4ab4cc
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestElasticsearchCala {
+
+  val filePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testExcelWrite: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        DataCalcGroupData.getData(elasticsearchWriteConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testExcelReader: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        DataCalcGroupData.getData(elasticsearchReaderConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  val elasticsearchWriteConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "elasticsearch",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "node": "localhost",
+      |                "port": "9200",
+      |                "index": "estest",
+      |                "saveMode": "overwrite"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val elasticsearchReaderConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "elasticsearch",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "node": "localhost",
+      |                "port": "9200",
+      |                "index": "estest"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file://{filePath}/csv",
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
new file mode 100644
index 000000000..a7fda8666
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestMongoCala {
+
+  val filePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testExcelWrite: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data = DataCalcGroupData.getData(mongoWriteConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testExcelReader: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data = DataCalcGroupData.getData(mongoReaderConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  val mongoWriteConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "mongo",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "uri": "mongodb://localhost:27017/test",
+      |                "database": "test",
+      |                "collection": "test",
+      |                "saveMode": "overwrite"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val mongoReaderConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "mongo",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "uri": "mongodb://localhost:27017/test",
+      |                "database": "test",
+      |                "collection": "test"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file://{filePath}/json",
+      |                "saveMode": "overwrite",
+      |                "serializer": "json"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
new file mode 100644
index 000000000..64e69e498
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class ElasticsearchSinkConfig extends SinkConfig {
+
+  @NotBlank private String node;
+
+  @NotBlank private String port;
+
+  @NotBlank private String index;
+
+  private String type = "_doc";
+
+  private String user = "";
+
+  private String password = "";
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  public void setNode(String node) {
+    this.node = node;
+  }
+
+  public String getPort() {
+    return port;
+  }
+
+  public void setPort(String port) {
+    this.port = port;
+  }
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java
new file mode 100644
index 000000000..072bad67c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class MongoSinkConfig extends SinkConfig {
+
+  @NotBlank private String uri;
+
+  @NotBlank private String database;
+
+  @NotBlank private String collection;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public String getUri() {
+    return uri;
+  }
+
+  public void setUri(String uri) {
+    this.uri = uri;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getCollection() {
+    return collection;
+  }
+
+  public void setCollection(String collection) {
+    this.collection = collection;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java
new file mode 100644
index 000000000..feed085ac
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class ElasticsearchSourceConfig extends SourceConfig {
+
+  @NotBlank private String node;
+
+  @NotBlank private String port;
+
+  @NotBlank private String index;
+
+  private String user = "";
+
+  private String password = "";
+
+  private String query = "{\"query\":{\"match_all\":{}}}";
+
+  private String type = "_doc";
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  public void setNode(String node) {
+    this.node = node;
+  }
+
+  public String getPort() {
+    return port;
+  }
+
+  public void setPort(String port) {
+    this.port = port;
+  }
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java
new file mode 100644
index 000000000..83deed369
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class MongoSourceConfig extends SourceConfig {
+
+  @NotBlank private String uri;
+
+  @NotBlank private String database;
+
+  @NotBlank private String collection;
+
+  public String getUri() {
+    return uri;
+  }
+
+  public void setUri(String uri) {
+    this.uri = uri;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getCollection() {
+    return collection;
+  }
+
+  public void setCollection(String collection) {
+    this.collection = collection;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index d4628bb82..f0e62983c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -47,6 +47,8 @@ public class PluginUtil {
     classMap.put("redis", RedisSource.class);
     classMap.put("datalake", DataLakeSource.class);
     classMap.put("rocketmq", RocketmqSource.class);
+    classMap.put("mongo", MongoSource.class);
+    classMap.put("elasticsearch", ElasticsearchSource.class);
     return classMap;
   }
 
@@ -65,6 +67,8 @@ public class PluginUtil {
     classMap.put("redis", RedisSink.class);
     classMap.put("datalake", DataLakeSink.class);
     classMap.put("rocketmq", RocketmqSink.class);
+    classMap.put("mongo", MongoSink.class);
+    classMap.put("elasticsearch", ElasticsearchSink.class);
     return classMap;
   }
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
new file mode 100644
index 000000000..37e56c7fc
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class ElasticsearchSink extends DataCalcSink[ElasticsearchSinkConfig] with Logging {
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    var options = Map(
+      "es.index.auto.create" -> "true",
+      "es.nodes" -> config.getNode,
+      "es.port" -> config.getPort,
+      "es.net.http.auth.user" -> config.getUser,
+      "es.net.http.auth.pass" -> config.getPassword
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    val writer = ds.write.format("org.elasticsearch.spark.sql")
+    if (StringUtils.isNotBlank(config.getSaveMode)) {
+      writer.mode(config.getSaveMode)
+    }
+
+    logger.info(
+      s"Load data to elasticsearch nodes: ${config.getNode}, port: ${config.getPort}, index: ${config.getIndex}"
+    )
+    writer.options(options).save(s"${config.getIndex}/${config.getType}")
+  }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala
new file mode 100644
index 000000000..d26df410c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class MongoSink extends DataCalcSink[MongoSinkConfig] with Logging {
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    var options = Map(
+      "spark.mongodb.output.database" -> config.getDatabase,
+      "spark.mongodb.output.collection" -> config.getCollection,
+      "spark.mongodb.output.uri" -> config.getUri
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    val writer = ds.write.format("mongo")
+    if (StringUtils.isNotBlank(config.getSaveMode)) {
+      writer.mode(config.getSaveMode)
+    }
+
+    logger.info(
+      s"Load data to mongo uri: ${config.getUri}, database: ${config.getDatabase}, collection: ${config.getCollection}"
+    )
+    writer.options(options).save()
+  }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala
new file mode 100644
index 000000000..f06a40db9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class ElasticsearchSource extends DataCalcSource[ElasticsearchSourceConfig] with Logging {
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("org.elasticsearch.spark.sql")
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    logger.info(
+      s"Load data from elasticsearch nodes: ${config.getNode}, port: ${config.getPort}, index: ${config.getIndex}"
+    )
+
+    reader
+      .option("es.nodes", config.getNode)
+      .option("es.port", config.getPort)
+      .option("es.net.http.auth.user", config.getUser)
+      .option("es.net.http.auth.pass", config.getPassword)
+      .option("es.query", config.getQuery)
+      .option("es.resource", s"${config.getIndex}/${config.getType}")
+      .load()
+  }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala
new file mode 100644
index 000000000..ad26cd92d
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class MongoSource extends DataCalcSource[MongoSourceConfig] with Logging {
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("mongo")
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    logger.info(
+      s"Load data from mongo uri: ${config.getUri}, database: ${config.getDatabase}, collection: ${config.getCollection}"
+    )
+
+    reader
+      .option("spark.mongodb.input.database", config.getDatabase)
+      .option("spark.mongodb.input.collection", config.getCollection)
+      .option("spark.mongodb.input.uri", config.getUri)
+      .load()
+  }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
index 37b3ae92e..08d5886dc 100644
--- a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
+++ b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
@@ -20,9 +20,7 @@ package org.apache.linkis.engineplugin.spark.datacalc
 import org.apache.linkis.common.io.FsPath
 import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
 
-import org.apache.spark.sql.SparkSession
-
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.{Assertions, Test};
 
 class TestRocketmqCala {
 
@@ -33,14 +31,12 @@ class TestRocketmqCala {
     // skip os: windows
     if (!FsPath.WINDOWS) {
       val data = DataCalcGroupData.getData(rocketmqReaderConfigJson)
-      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(data != null)
 
-      val spark = SparkSession
-        .builder()
-        .master("local")
-        .getOrCreate()
-
-      DataCalcExecution.execute(spark, sources, transforms, sinks)
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org