You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/05/23 14:13:08 UTC
[linkis] branch dev-1.4.0 updated: spark etl support mongo and es (#4560)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 42d070df6 spark etl support mongo and es (#4560)
42d070df6 is described below
commit 42d070df60cdd81a0af5b13ecb165f284d6d0385
Author: ChengJie1053 <18...@163.com>
AuthorDate: Tue May 23 22:13:00 2023 +0800
spark etl support mongo and es (#4560)
---
linkis-engineconn-plugins/spark/scala-2.12/pom.xml | 22 ++++
.../spark/datacalc/TestElasticsearchCala.scala | 122 +++++++++++++++++++++
.../spark/datacalc/TestMongoCala.scala | 120 ++++++++++++++++++++
.../datacalc/sink/ElasticsearchSinkConfig.java | 101 +++++++++++++++++
.../spark/datacalc/sink/MongoSinkConfig.java | 71 ++++++++++++
.../datacalc/source/ElasticsearchSourceConfig.java | 95 ++++++++++++++++
.../spark/datacalc/source/MongoSourceConfig.java | 55 ++++++++++
.../spark/datacalc/util/PluginUtil.java | 4 +
.../spark/datacalc/sink/ElasticsearchSink.scala | 54 +++++++++
.../spark/datacalc/sink/MongoSink.scala | 52 +++++++++
.../datacalc/source/ElasticsearchSource.scala | 48 ++++++++
.../spark/datacalc/source/MongoSource.scala | 45 ++++++++
.../spark/datacalc/TestRocketmqCala.scala | 16 +--
13 files changed, 795 insertions(+), 10 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
index aa8f5b75a..a4485b00d 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
+++ b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
@@ -116,6 +116,28 @@
<artifactId>hudi-spark3.2-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.mongodb.spark</groupId>
+ <artifactId>mongo-spark-connector_${scala.binary.version}</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch-spark-30_${scala.binary.version}</artifactId>
+ <version>7.17.7</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
new file mode 100644
index 000000000..0ea4ab4cc
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestElasticsearchCala {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testExcelWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(elasticsearchWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testExcelReader: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(elasticsearchReaderConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val elasticsearchWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "elasticsearch",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "node": "localhost",
+ | "port": "9200",
+ | "index": "estest",
+ | "saveMode": "overwrite"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val elasticsearchReaderConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "elasticsearch",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "node": "localhost",
+ | "port": "9200",
+ | "index": "estest"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/csv",
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
new file mode 100644
index 000000000..a7fda8666
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestMongoCala {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testExcelWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data = DataCalcGroupData.getData(mongoWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testExcelReader: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data = DataCalcGroupData.getData(mongoReaderConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val mongoWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "mongo",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "uri": "mongodb://localhost:27017/test",
+ | "database": "test",
+ | "collection": "test",
+ | "saveMode": "overwrite"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val mongoReaderConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "mongo",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "uri": "mongodb://localhost:27017/test",
+ | "database": "test",
+ | "collection": "test"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/json",
+ | "saveMode": "overwrite",
+ | "serializer": "json"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
new file mode 100644
index 000000000..64e69e498
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class ElasticsearchSinkConfig extends SinkConfig {
+
+ @NotBlank private String node;
+
+ @NotBlank private String port;
+
+ @NotBlank private String index;
+
+ private String type = "_doc";
+
+ private String user = "";
+
+ private String password = "";
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ public void setNode(String node) {
+ this.node = node;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java
new file mode 100644
index 000000000..072bad67c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSinkConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class MongoSinkConfig extends SinkConfig {
+
+ @NotBlank private String uri;
+
+ @NotBlank private String database;
+
+ @NotBlank private String collection;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java
new file mode 100644
index 000000000..feed085ac
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSourceConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class ElasticsearchSourceConfig extends SourceConfig {
+
+ @NotBlank private String node;
+
+ @NotBlank private String port;
+
+ @NotBlank private String index;
+
+ private String user = "";
+
+ private String password = "";
+
+ private String query = "{\"query\":{\"match_all\":{}}}";
+
+ private String type = "_doc";
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ public void setNode(String node) {
+ this.node = node;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java
new file mode 100644
index 000000000..83deed369
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSourceConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class MongoSourceConfig extends SourceConfig {
+
+ @NotBlank private String uri;
+
+ @NotBlank private String database;
+
+ @NotBlank private String collection;
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index d4628bb82..f0e62983c 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -47,6 +47,8 @@ public class PluginUtil {
classMap.put("redis", RedisSource.class);
classMap.put("datalake", DataLakeSource.class);
classMap.put("rocketmq", RocketmqSource.class);
+ classMap.put("mongo", MongoSource.class);
+ classMap.put("elasticsearch", ElasticsearchSource.class);
return classMap;
}
@@ -65,6 +67,8 @@ public class PluginUtil {
classMap.put("redis", RedisSink.class);
classMap.put("datalake", DataLakeSink.class);
classMap.put("rocketmq", RocketmqSink.class);
+ classMap.put("mongo", MongoSink.class);
+ classMap.put("elasticsearch", ElasticsearchSink.class);
return classMap;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
new file mode 100644
index 000000000..37e56c7fc
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class ElasticsearchSink extends DataCalcSink[ElasticsearchSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "es.index.auto.create" -> "true",
+ "es.nodes" -> config.getNode,
+ "es.port" -> config.getPort,
+ "es.net.http.auth.user" -> config.getUser,
+ "es.net.http.auth.pass" -> config.getPassword
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ val writer = ds.write.format("org.elasticsearch.spark.sql")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+
+ logger.info(
+ s"Load data to elasticsearch nodes: ${config.getNode}, port: ${config.getPort}, index: ${config.getIndex}"
+ )
+ writer.options(options).save(s"${config.getIndex}/${config.getType}")
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala
new file mode 100644
index 000000000..d26df410c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/MongoSink.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class MongoSink extends DataCalcSink[MongoSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "spark.mongodb.output.database" -> config.getDatabase,
+ "spark.mongodb.output.collection" -> config.getCollection,
+ "spark.mongodb.output.uri" -> config.getUri
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ val writer = ds.write.format("mongo")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+
+ logger.info(
+ s"Load data to mongo uri: ${config.getUri}, database: ${config.getDatabase}, collection: ${config.getCollection}"
+ )
+ writer.options(options).save()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala
new file mode 100644
index 000000000..f06a40db9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ElasticsearchSource.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class ElasticsearchSource extends DataCalcSource[ElasticsearchSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("org.elasticsearch.spark.sql")
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from elasticsearch nodes: ${config.getNode}, port: ${config.getPort}, index: ${config.getIndex}"
+ )
+
+ reader
+ .option("es.nodes", config.getNode)
+ .option("es.port", config.getPort)
+ .option("es.net.http.auth.user", config.getUser)
+ .option("es.net.http.auth.pass", config.getPassword)
+ .option("es.query", config.getQuery)
+ .option("es.resource", s"${config.getIndex}/${config.getType}")
+ .load()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala
new file mode 100644
index 000000000..ad26cd92d
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/MongoSource.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class MongoSource extends DataCalcSource[MongoSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("mongo")
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from mongo uri: ${config.getUri}, database: ${config.getDatabase}, collection: ${config.getCollection}"
+ )
+
+ reader
+ .option("spark.mongodb.input.database", config.getDatabase)
+ .option("spark.mongodb.input.collection", config.getCollection)
+ .option("spark.mongodb.input.uri", config.getUri)
+ .load()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
index 37b3ae92e..08d5886dc 100644
--- a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
+++ b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestRocketmqCala.scala
@@ -20,9 +20,7 @@ package org.apache.linkis.engineplugin.spark.datacalc
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
-import org.apache.spark.sql.SparkSession
-
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.{Assertions, Test};
class TestRocketmqCala {
@@ -33,14 +31,12 @@ class TestRocketmqCala {
// skip os: windows
if (!FsPath.WINDOWS) {
val data = DataCalcGroupData.getData(rocketmqReaderConfigJson)
- val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(data != null)
- val spark = SparkSession
- .builder()
- .master("local")
- .getOrCreate()
-
- DataCalcExecution.execute(spark, sources, transforms, sinks)
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org