You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/27 14:13:19 UTC
[linkis] branch master updated: Spark etl support es upsert (#4707)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 4fa3728e9 Spark etl support es upsert (#4707)
4fa3728e9 is described below
commit 4fa3728e94d2f0980149a82021e5717ece2f6574
Author: ChengJie1053 <18...@163.com>
AuthorDate: Tue Jun 27 22:13:08 2023 +0800
Spark etl support es upsert (#4707)
---
.../spark/datacalc/TestElasticsearchCala.scala | 58 +++++++++++++++++++++-
.../spark/datacalc/TestMongoCala.scala | 4 +-
.../exception/ElasticsearchSinkException.java | 33 ++++++++++++
.../datacalc/sink/ElasticsearchSinkConfig.java | 14 +++++-
.../spark/datacalc/sink/ElasticsearchSink.scala | 14 ++++++
5 files changed, 117 insertions(+), 6 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
index 0ea4ab4cc..94e14ef39 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
@@ -27,7 +27,7 @@ class TestElasticsearchCala {
val filePath = this.getClass.getResource("/").getFile
@Test
- def testExcelWrite: Unit = {
+ def testElasticsearchWrite: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data =
@@ -42,7 +42,26 @@ class TestElasticsearchCala {
}
@Test
- def testExcelReader: Unit = {
+ def testElasticsearchWriteByUpsert: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(
+ elasticsearchWriteConfigJsonByUpsert.replace("{filePath}", filePath)
+ )
+ Assertions.assertTrue(data != null)
+
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testElasticsearchReader: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data =
@@ -90,6 +109,41 @@ class TestElasticsearchCala {
|}
|""".stripMargin
+ val elasticsearchWriteConfigJsonByUpsert =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "elasticsearch",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "node": "localhost",
+ | "port": "9200",
+ | "index": "estest",
+ | "primaryKey": "age",
+ | "saveMode": "upsert"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
val elasticsearchReaderConfigJson =
"""
|{
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
index a7fda8666..c8bf8cc47 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
@@ -27,7 +27,7 @@ class TestMongoCala {
val filePath = this.getClass.getResource("/").getFile
@Test
- def testExcelWrite: Unit = {
+ def testMongoWrite: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data = DataCalcGroupData.getData(mongoWriteConfigJson.replace("{filePath}", filePath))
@@ -41,7 +41,7 @@ class TestMongoCala {
}
@Test
- def testExcelReader: Unit = {
+ def testMongoReader: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data = DataCalcGroupData.getData(mongoReaderConfigJson.replace("{filePath}", filePath))
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java
new file mode 100644
index 000000000..0d77d6ab7
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
+
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
+
+public class ElasticsearchSinkException extends LinkisRuntimeException {
+
+ public ElasticsearchSinkException(int errCode, String desc) {
+ super(errCode, desc);
+ }
+
+ @Override
+ public ExceptionLevel getLevel() {
+ return ExceptionLevel.ERROR;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
index 64e69e498..9f4f26e64 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
@@ -36,13 +36,23 @@ public class ElasticsearchSinkConfig extends SinkConfig {
private String password = "";
+ private String primaryKey = "";
+
@NotBlank
@Pattern(
- regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ regexp = "^(overwrite|append|upsert|ignore|error|errorifexists)$",
message =
- "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append','upsert', 'ignore', 'error', 'errorifexists'.")
private String saveMode = "overwrite";
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+
+ public void setPrimaryKey(String primaryKey) {
+ this.primaryKey = primaryKey;
+ }
+
public String getUser() {
return user;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
index 9350cd022..2e448c3e1 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
@@ -19,6 +19,8 @@ package org.apache.linkis.engineplugin.spark.datacalc.sink
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import org.apache.linkis.engineplugin.spark.datacalc.exception.ElasticsearchSinkException
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -41,6 +43,18 @@ class ElasticsearchSink extends DataCalcSink[ElasticsearchSinkConfig] with Loggi
options = config.getOptions.asScala.toMap ++ options
}
+ if (config.getSaveMode.equalsIgnoreCase("upsert")) {
+ if (StringUtils.isBlank(config.getPrimaryKey)) {
+ throw new ElasticsearchSinkException(
+ SparkErrorCodeSummary.DATA_CALC_VARIABLE_NOT_EXIST.getErrorCode,
+ "saveMode is upsert, please set elasticsearch mapping [primaryKey] in variables"
+ )
+ }
+ options =
+ options ++ Map("es.write.operation" -> "upsert", "es.mapping.id" -> config.getPrimaryKey)
+ config.setSaveMode("append")
+ }
+
val writer = ds.write.format("org.elasticsearch.spark.sql")
if (StringUtils.isNotBlank(config.getSaveMode)) {
writer.mode(config.getSaveMode)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org