You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/19 02:54:46 UTC
[linkis] branch dev-1.4.0 updated: Spark etl support starrocks (#4650)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new c3cfc90de Spark etl support starrocks (#4650)
c3cfc90de is described below
commit c3cfc90de715f127477fd9284cbae843ec2532e9
Author: ChengJie1053 <18...@163.com>
AuthorDate: Mon Jun 19 10:54:40 2023 +0800
Spark etl support starrocks (#4650)
---
.../spark/datacalc/TestStarrocksCala.scala | 128 +++++++++++++++++++++
.../spark/datacalc/sink/StarrocksSinkConfig.java | 85 ++++++++++++++
.../datacalc/source/StarrocksSourceConfig.java | 73 ++++++++++++
.../spark/datacalc/util/PluginUtil.java | 2 +
.../spark/datacalc/sink/StarrocksSink.scala | 54 +++++++++
.../spark/datacalc/source/StarrocksSource.scala | 49 ++++++++
6 files changed, 391 insertions(+)
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala
new file mode 100644
index 000000000..54aada34c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestStarrocksCala {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testStarrocksWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ // todo The starrocks connector currently only supports the 'append' mode, using the starrocks 'Primary Key table' to do 'upsert'
+ val data = DataCalcGroupData.getData(starrocksWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testStarrocksReader: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(starrocksReaderConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val starrocksWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "starrocks",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "jdbcUrl": "jdbc:mysql://localhost:9030",
+ | "user": "root",
+ | "password": "",
+ | "targetDatabase": "test",
+ | "targetTable": "test"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val starrocksReaderConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "starrocks",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "user": "root",
+ | "password": "",
+ | "sourceDatabase": "test",
+ | "sourceTable": "test"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/json",
+ | "saveMode": "overwrite",
+ | "serializer": "json"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java
new file mode 100644
index 000000000..f95b108be
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class StarrocksSinkConfig extends SinkConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String jdbcUrl;
+
+ @NotBlank private String user;
+
+ private String password;
+
+ @NotBlank private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getJdbcUrl() {
+ return jdbcUrl;
+ }
+
+ public void setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java
new file mode 100644
index 000000000..89d23c4d9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class StarrocksSourceConfig extends SourceConfig {
+
+ @NotBlank private String url;
+ @NotBlank private String user;
+ private String password;
+
+ @NotBlank private String sourceDatabase;
+
+ @NotBlank private String sourceTable;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getSourceDatabase() {
+ return sourceDatabase;
+ }
+
+ public void setSourceDatabase(String sourceDatabase) {
+ this.sourceDatabase = sourceDatabase;
+ }
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index 28ae3a4c3..e27d110c3 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -51,6 +51,7 @@ public class PluginUtil {
classMap.put("elasticsearch", ElasticsearchSource.class);
classMap.put("solr", SolrSource.class);
classMap.put("kafka", KafkaSource.class);
+ classMap.put("starrocks", StarrocksSource.class);
return classMap;
}
@@ -73,6 +74,7 @@ public class PluginUtil {
classMap.put("elasticsearch", ElasticsearchSink.class);
classMap.put("solr", SolrSink.class);
classMap.put("kafka", KafkaSink.class);
+ classMap.put("starrocks", StarrocksSink.class);
return classMap;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala
new file mode 100644
index 000000000..bda047a9f
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class StarrocksSink extends DataCalcSink[StarrocksSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "spark.starrocks.conf" -> "write",
+ "spark.starrocks.write.fe.urls.http" -> config.getUrl,
+ "spark.starrocks.write.fe.urls.jdbc" -> config.getJdbcUrl,
+ "spark.starrocks.write.username" -> config.getUser,
+ "spark.starrocks.write.password" -> config.getPassword,
+ "spark.starrocks.write.properties.ignore_json_size" -> "true",
+ "spark.starrocks.write.database" -> config.getTargetDatabase,
+ "spark.starrocks.write.table" -> config.getTargetTable
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ // todo The starrocks connector currently only supports the 'append' mode, using the starrocks 'Primary Key table' to do 'upsert'
+ val writer = ds.write.format("starrocks_writer").mode("append")
+
+ logger.info(
+ s"Save data from starrocks url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
+ )
+ writer.options(options).save()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala
new file mode 100644
index 000000000..ea2a9fa04
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class StarrocksSource extends DataCalcSource[StarrocksSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("starrocks")
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from starrocks url: ${config.getUrl}, sourceDatabase: ${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
+ )
+
+ reader
+ .option(
+ "starrocks.table.identifier",
+ String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
+ )
+ .option("starrocks.fenodes", config.getUrl)
+ .option("user", config.getUser)
+ .option("password", config.getPassword)
+ .load()
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org