You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/09 05:13:11 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector] Add iceberg spark batch sink (#1439)

This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41ea851  [Feature][Connector] Add iceberg spark batch sink (#1439)
41ea851 is described below

commit 41ea851fd4ae41ab35fc451ba461cdecf2879000
Author: ououtt <73...@qq.com>
AuthorDate: Wed Mar 9 13:13:07 2022 +0800

    [Feature][Connector] Add iceberg spark batch sink (#1439)
    
    * add iceberg spark batch sink
    
    * fix iceberg runtime scope
    
    * fix license
    
    * fix license
    
    * Set savemode to append by default
    
    Co-authored-by: eye <ey...@aloudata.com>
---
 .../en/spark/configuration/sink-plugins/Iceberg.md | 61 ++++++++++++++++++++++
 pom.xml                                            |  1 +
 .../org.apache.seatunnel.spark.BaseSparkSink       | 19 +++++++
 .../seatunnel/spark/{source => sink}/Iceberg.scala | 34 +++++++-----
 .../apache/seatunnel/spark/source/Iceberg.scala    |  7 +--
 seatunnel-dist/release-docs/LICENSE                |  1 -
 tools/dependencies/known-dependencies.txt          |  1 -
 7 files changed, 105 insertions(+), 19 deletions(-)

diff --git a/docs/en/spark/configuration/sink-plugins/Iceberg.md b/docs/en/spark/configuration/sink-plugins/Iceberg.md
new file mode 100644
index 0000000..5081859
--- /dev/null
+++ b/docs/en/spark/configuration/sink-plugins/Iceberg.md
@@ -0,0 +1,61 @@
+# Iceberg
+
+> Sink plugin: Iceberg [Spark]
+
+## Description
+
+Write data to Iceberg.
+
+## Options
+
+| name                                                         | type   | required | default value |
+| ------------------------------------------------------------ | ------ | -------- | ------------- |
+| [path](#path)                                                | string | yes      | -             |
+| [saveMode](#saveMode)                                        | string | no       | append        |
+| [target-file-size-bytes](#target-file-size-bytes)            | long   | no       | -             |
+| [check-nullability](#check-nullability)                      | bool   | no       | -             |
+| [snapshot-property.custom-key](#snapshot-property.custom-key)| string | no       | -             |
+| [fanout-enabled](#fanout-enabled)                            | bool   | no       | -             |
+| [check-ordering](#check-ordering)                            | bool   | no       | -             |
+
+
+Refer to [iceberg write options](https://iceberg.apache.org/docs/latest/spark-configuration/) for more configurations.
+
+### path
+
+Iceberg table location.
+
+### saveMode
+
+append or overwrite. Only these two modes are supported by iceberg. The default value is append.
+
+### target-file-size-bytes
+
+Overrides this table’s write.target-file-size-bytes
+
+### check-nullability
+
+Sets the nullable check on fields
+
+### snapshot-property.custom-key
+
+Adds an entry with custom-key and corresponding value in the snapshot summary
+eg: snapshot-property.aaaa="bbbb"
+
+### fanout-enabled
+
+Overrides this table’s write.spark.fanout.enabled
+
+### check-ordering
+
+Checks if input schema and table schema are same
+
+## Example
+
+```bash
+iceberg {
+    path = "hdfs://localhost:9000/iceberg/warehouse/db/table"
+  }
+```
+
+
diff --git a/pom.xml b/pom.xml
index 23651bc..70a221c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -510,6 +510,7 @@
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>iceberg-spark-runtime</artifactId>
                 <version>${iceberg.version}</version>
+                <scope>provided</scope>
             </dependency>
         </dependencies>
     </dependencyManagement>
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..0e673b2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.seatunnel.spark.sink.Iceberg
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
similarity index 59%
copy from seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
copy to seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
index 53134d5..438ef9d 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
@@ -14,37 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.source
+package org.apache.seatunnel.spark.sink
 
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType
+import org.apache.seatunnel.shade.com.typesafe.config.{ConfigFactory, ConfigValueType}
 import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.sql.{Dataset, Row}
 
 import scala.collection.JavaConversions._
 
-class Iceberg extends SparkBatchSource {
-  override def getData(env: SparkEnvironment): Dataset[Row] = {
-    val reader = env.getSparkSession.read.format("iceberg")
+class Iceberg extends SparkBatchSink {
+
+  override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+    val writer = data.write.format("iceberg")
     for (e <- config.entrySet()) {
       e.getValue.valueType match {
         case ConfigValueType.NUMBER =>
-          reader.option(e.getKey, Long.unbox(e.getValue.unwrapped()))
+          writer.option(e.getKey, config.getLong(e.getKey))
         case ConfigValueType.BOOLEAN =>
-          reader.option(e.getKey, Boolean.unbox(e.getValue.unwrapped()))
-        case ConfigValueType.STRING => reader.option(e.getKey, e.getValue.unwrapped().toString)
+          writer.option(e.getKey, config.getBoolean(e.getKey))
+        case ConfigValueType.STRING =>
+          writer.option(e.getKey, config.getString(e.getKey))
       }
     }
-    val df = reader.load(config.getString("path"))
-    df.createOrReplaceTempView(config.getString("result_table_name"))
-    env.getSparkSession.sql(config.getString("pre_sql"))
+    writer.mode(config.getString("saveMode"))
+      .save(config.getString("path"))
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "path", "pre_sql")
+    checkAllExists(config, "path")
   }
 
-  override def prepare(prepareEnv: SparkEnvironment): Unit = {}
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        "saveMode" -> "append"))
+    config = config.withFallback(defaultConfig)
+  }
 }
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
index 53134d5..e5a60a7 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
@@ -31,10 +31,11 @@ class Iceberg extends SparkBatchSource {
     for (e <- config.entrySet()) {
       e.getValue.valueType match {
         case ConfigValueType.NUMBER =>
-          reader.option(e.getKey, Long.unbox(e.getValue.unwrapped()))
+          reader.option(e.getKey, config.getLong(e.getKey))
         case ConfigValueType.BOOLEAN =>
-          reader.option(e.getKey, Boolean.unbox(e.getValue.unwrapped()))
-        case ConfigValueType.STRING => reader.option(e.getKey, e.getValue.unwrapped().toString)
+          reader.option(e.getKey, config.getBoolean(e.getKey))
+        case ConfigValueType.STRING =>
+          reader.option(e.getKey, config.getString(e.getKey))
       }
     }
     val df = reader.load(config.getString("path"))
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index c2dc02c..23a9f9a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -604,7 +604,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-bundled-guava:0.13.1 - https://iceberg.apache.org)
      (The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-common:0.13.1 - https://iceberg.apache.org)
      (The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-core:0.13.1 - https://iceberg.apache.org)
-     (The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-spark-runtime:0.13.1 - https://iceberg.apache.org)
      (The Apache Software License, Version 2.0) Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
      (The Apache Software License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:2.0.0 - http://kafka.apache.org)
      (The Apache Software License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:2.4.1 - https://kafka.apache.org)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 3608017..75a7f53 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -657,7 +657,6 @@ iceberg-core-0.13.1.jar
 iceberg-api-0.13.1.jar
 iceberg-common-0.13.1.jar
 iceberg-bundled-guava-0.13.1.jar
-iceberg-spark-runtime-0.13.1.jar
 caffeine-2.8.4.jar
 checker-qual-3.4.0.jar
 error_prone_annotations-2.3.4.jar