You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/19 02:54:46 UTC

[linkis] branch dev-1.4.0 updated: Spark etl support starrocks (#4650)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new c3cfc90de Spark etl support starrocks (#4650)
c3cfc90de is described below

commit c3cfc90de715f127477fd9284cbae843ec2532e9
Author: ChengJie1053 <18...@163.com>
AuthorDate: Mon Jun 19 10:54:40 2023 +0800

    Spark etl support starrocks (#4650)
---
 .../spark/datacalc/TestStarrocksCala.scala         | 128 +++++++++++++++++++++
 .../spark/datacalc/sink/StarrocksSinkConfig.java   |  85 ++++++++++++++
 .../datacalc/source/StarrocksSourceConfig.java     |  73 ++++++++++++
 .../spark/datacalc/util/PluginUtil.java            |   2 +
 .../spark/datacalc/sink/StarrocksSink.scala        |  54 +++++++++
 .../spark/datacalc/source/StarrocksSource.scala    |  49 ++++++++
 6 files changed, 391 insertions(+)

diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala
new file mode 100644
index 000000000..54aada34c
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestStarrocksCala.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestStarrocksCala {
+
+  val filePath = this.getClass.getResource("/").getFile
+
+  @Test
+  def testStarrocksWrite: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      // todo The starrocks connector currently only supports the 'append' mode, using the starrocks 'Primary Key table' to do 'upsert'
+      val data = DataCalcGroupData.getData(starrocksWriteConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testStarrocksReader: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        DataCalcGroupData.getData(starrocksReaderConfigJson.replace("{filePath}", filePath))
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  val starrocksWriteConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "starrocks",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "url": "localhost:8030",
+      |                "jdbcUrl": "jdbc:mysql://localhost:9030",
+      |                "user": "root",
+      |                "password": "",
+      |                "targetDatabase": "test",
+      |                "targetTable": "test"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val starrocksReaderConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "starrocks",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "url": "localhost:8030",
+      |                "user": "root",
+      |                "password": "",
+      |                "sourceDatabase": "test",
+      |                "sourceTable": "test"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "file://{filePath}/json",
+      |                "saveMode": "overwrite",
+      |                "serializer": "json"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java
new file mode 100644
index 000000000..f95b108be
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSinkConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class StarrocksSinkConfig extends SinkConfig {
+
+  @NotBlank private String url;
+
+  @NotBlank private String jdbcUrl;
+
+  @NotBlank private String user;
+
+  private String password;
+
+  @NotBlank private String targetDatabase;
+
+  @NotBlank private String targetTable;
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getJdbcUrl() {
+    return jdbcUrl;
+  }
+
+  public void setJdbcUrl(String jdbcUrl) {
+    this.jdbcUrl = jdbcUrl;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getTargetDatabase() {
+    return targetDatabase;
+  }
+
+  public void setTargetDatabase(String targetDatabase) {
+    this.targetDatabase = targetDatabase;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java
new file mode 100644
index 000000000..89d23c4d9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSourceConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class StarrocksSourceConfig extends SourceConfig {
+
+  @NotBlank private String url;
+  @NotBlank private String user;
+  private String password;
+
+  @NotBlank private String sourceDatabase;
+
+  @NotBlank private String sourceTable;
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getSourceDatabase() {
+    return sourceDatabase;
+  }
+
+  public void setSourceDatabase(String sourceDatabase) {
+    this.sourceDatabase = sourceDatabase;
+  }
+
+  public String getSourceTable() {
+    return sourceTable;
+  }
+
+  public void setSourceTable(String sourceTable) {
+    this.sourceTable = sourceTable;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index 28ae3a4c3..e27d110c3 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -51,6 +51,7 @@ public class PluginUtil {
     classMap.put("elasticsearch", ElasticsearchSource.class);
     classMap.put("solr", SolrSource.class);
     classMap.put("kafka", KafkaSource.class);
+    classMap.put("starrocks", StarrocksSource.class);
     return classMap;
   }
 
@@ -73,6 +74,7 @@ public class PluginUtil {
     classMap.put("elasticsearch", ElasticsearchSink.class);
     classMap.put("solr", SolrSink.class);
     classMap.put("kafka", KafkaSink.class);
+    classMap.put("starrocks", StarrocksSink.class);
     return classMap;
   }
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala
new file mode 100644
index 000000000..bda047a9f
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/StarrocksSink.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class StarrocksSink extends DataCalcSink[StarrocksSinkConfig] with Logging {
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    var options = Map(
+      "spark.starrocks.conf" -> "write",
+      "spark.starrocks.write.fe.urls.http" -> config.getUrl,
+      "spark.starrocks.write.fe.urls.jdbc" -> config.getJdbcUrl,
+      "spark.starrocks.write.username" -> config.getUser,
+      "spark.starrocks.write.password" -> config.getPassword,
+      "spark.starrocks.write.properties.ignore_json_size" -> "true",
+      "spark.starrocks.write.database" -> config.getTargetDatabase,
+      "spark.starrocks.write.table" -> config.getTargetTable
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    // todo The starrocks connector currently only supports the 'append' mode, using the starrocks 'Primary Key table' to do 'upsert'
+    val writer = ds.write.format("starrocks_writer").mode("append")
+
+    logger.info(
+      s"Save data from starrocks url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
+    )
+    writer.options(options).save()
+  }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala
new file mode 100644
index 000000000..ea2a9fa04
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/StarrocksSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class StarrocksSource extends DataCalcSource[StarrocksSourceConfig] with Logging {
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("starrocks")
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    logger.info(
+      s"Load data from starrocks url: ${config.getUrl}, sourceDatabase: ${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
+    )
+
+    reader
+      .option(
+        "starrocks.table.identifier",
+        String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
+      )
+      .option("starrocks.fenodes", config.getUrl)
+      .option("user", config.getUser)
+      .option("password", config.getPassword)
+      .load()
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org