You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/06/27 14:13:19 UTC

[linkis] branch master updated: Spark etl support es upsert (#4707)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa3728e9 Spark etl support es upsert (#4707)
4fa3728e9 is described below

commit 4fa3728e94d2f0980149a82021e5717ece2f6574
Author: ChengJie1053 <18...@163.com>
AuthorDate: Tue Jun 27 22:13:08 2023 +0800

    Spark etl support es upsert (#4707)
---
 .../spark/datacalc/TestElasticsearchCala.scala     | 58 +++++++++++++++++++++-
 .../spark/datacalc/TestMongoCala.scala             |  4 +-
 .../exception/ElasticsearchSinkException.java      | 33 ++++++++++++
 .../datacalc/sink/ElasticsearchSinkConfig.java     | 14 +++++-
 .../spark/datacalc/sink/ElasticsearchSink.scala    | 14 ++++++
 5 files changed, 117 insertions(+), 6 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
index 0ea4ab4cc..94e14ef39 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestElasticsearchCala.scala
@@ -27,7 +27,7 @@ class TestElasticsearchCala {
   val filePath = this.getClass.getResource("/").getFile
 
   @Test
-  def testExcelWrite: Unit = {
+  def testElasticsearchWrite: Unit = {
     // skip os: windows
     if (!FsPath.WINDOWS) {
       val data =
@@ -42,7 +42,26 @@ class TestElasticsearchCala {
   }
 
   @Test
-  def testExcelReader: Unit = {
+  def testElasticsearchWriteByUpsert: Unit = {
+    // skip os: windows
+    if (!FsPath.WINDOWS) {
+      val data =
+        DataCalcGroupData.getData(
+          elasticsearchWriteConfigJsonByUpsert.replace("{filePath}", filePath)
+        )
+      Assertions.assertTrue(data != null)
+
+      Assertions.assertTrue(data != null)
+
+      val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+      Assertions.assertTrue(sources != null)
+      Assertions.assertTrue(transforms != null)
+      Assertions.assertTrue(sinks != null)
+    }
+  }
+
+  @Test
+  def testElasticsearchReader: Unit = {
     // skip os: windows
     if (!FsPath.WINDOWS) {
       val data =
@@ -90,6 +109,41 @@ class TestElasticsearchCala {
       |}
       |""".stripMargin
 
+  val elasticsearchWriteConfigJsonByUpsert =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "path": "file://{filePath}/etltest.dolphin",
+      |                "serializer": "csv",
+      |                "options": {
+      |                "header":"true",
+      |                "delimiter":";"
+      |                },
+      |                "columnNames": ["name", "age"]
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "elasticsearch",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "node": "localhost",
+      |                "port": "9200",
+      |                "index": "estest",
+      |                "primaryKey": "age",
+      |                "saveMode": "upsert"
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
   val elasticsearchReaderConfigJson =
     """
       |{
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
index a7fda8666..c8bf8cc47 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestMongoCala.scala
@@ -27,7 +27,7 @@ class TestMongoCala {
   val filePath = this.getClass.getResource("/").getFile
 
   @Test
-  def testExcelWrite: Unit = {
+  def testMongoWrite: Unit = {
     // skip os: windows
     if (!FsPath.WINDOWS) {
       val data = DataCalcGroupData.getData(mongoWriteConfigJson.replace("{filePath}", filePath))
@@ -41,7 +41,7 @@ class TestMongoCala {
   }
 
   @Test
-  def testExcelReader: Unit = {
+  def testMongoReader: Unit = {
     // skip os: windows
     if (!FsPath.WINDOWS) {
       val data = DataCalcGroupData.getData(mongoReaderConfigJson.replace("{filePath}", filePath))
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java
new file mode 100644
index 000000000..0d77d6ab7
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ElasticsearchSinkException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
+
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
+
+public class ElasticsearchSinkException extends LinkisRuntimeException {
+
+  public ElasticsearchSinkException(int errCode, String desc) {
+    super(errCode, desc);
+  }
+
+  @Override
+  public ExceptionLevel getLevel() {
+    return ExceptionLevel.ERROR;
+  }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
index 64e69e498..9f4f26e64 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSinkConfig.java
@@ -36,13 +36,23 @@ public class ElasticsearchSinkConfig extends SinkConfig {
 
   private String password = "";
 
+  private String primaryKey = "";
+
   @NotBlank
   @Pattern(
-      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      regexp = "^(overwrite|append|upsert|ignore|error|errorifexists)$",
       message =
-          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append','upsert', 'ignore', 'error', 'errorifexists'.")
   private String saveMode = "overwrite";
 
+  public String getPrimaryKey() {
+    return primaryKey;
+  }
+
+  public void setPrimaryKey(String primaryKey) {
+    this.primaryKey = primaryKey;
+  }
+
   public String getUser() {
     return user;
   }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
index 9350cd022..2e448c3e1 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ElasticsearchSink.scala
@@ -19,6 +19,8 @@ package org.apache.linkis.engineplugin.spark.datacalc.sink
 
 import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import org.apache.linkis.engineplugin.spark.datacalc.exception.ElasticsearchSinkException
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -41,6 +43,18 @@ class ElasticsearchSink extends DataCalcSink[ElasticsearchSinkConfig] with Loggi
       options = config.getOptions.asScala.toMap ++ options
     }
 
+    if (config.getSaveMode.equalsIgnoreCase("upsert")) {
+      if (StringUtils.isBlank(config.getPrimaryKey)) {
+        throw new ElasticsearchSinkException(
+          SparkErrorCodeSummary.DATA_CALC_VARIABLE_NOT_EXIST.getErrorCode,
+          "saveMode is upsert, please set elasticsearch mapping [primaryKey] in variables"
+        )
+      }
+      options =
+        options ++ Map("es.write.operation" -> "upsert", "es.mapping.id" -> config.getPrimaryKey)
+      config.setSaveMode("append")
+    }
+
     val writer = ds.write.format("org.elasticsearch.spark.sql")
     if (StringUtils.isNotBlank(config.getSaveMode)) {
       writer.mode(config.getSaveMode)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org