You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/09 05:13:11 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector] Add iceberg spark batch sink (#1439)
This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 41ea851 [Feature][Connector] Add iceberg spark batch sink (#1439)
41ea851 is described below
commit 41ea851fd4ae41ab35fc451ba461cdecf2879000
Author: ououtt <73...@qq.com>
AuthorDate: Wed Mar 9 13:13:07 2022 +0800
[Feature][Connector] Add iceberg spark batch sink (#1439)
* add iceberg spark batch sink
* fix iceberg runtime scope
* fix license
* fix license
* Set savemode to append by default
Co-authored-by: eye <ey...@aloudata.com>
---
.../en/spark/configuration/sink-plugins/Iceberg.md | 61 ++++++++++++++++++++++
pom.xml | 1 +
.../org.apache.seatunnel.spark.BaseSparkSink | 19 +++++++
.../seatunnel/spark/{source => sink}/Iceberg.scala | 34 +++++++-----
.../apache/seatunnel/spark/source/Iceberg.scala | 7 +--
seatunnel-dist/release-docs/LICENSE | 1 -
tools/dependencies/known-dependencies.txt | 1 -
7 files changed, 105 insertions(+), 19 deletions(-)
diff --git a/docs/en/spark/configuration/sink-plugins/Iceberg.md b/docs/en/spark/configuration/sink-plugins/Iceberg.md
new file mode 100644
index 0000000..5081859
--- /dev/null
+++ b/docs/en/spark/configuration/sink-plugins/Iceberg.md
@@ -0,0 +1,61 @@
+# Iceberg
+
+> Sink plugin: Iceberg [Spark]
+
+## Description
+
+Write data to Iceberg.
+
+## Options
+
+| name | type | required | default value |
+| ------------------------------------------------------------ | ------ | -------- | ------------- |
+| [path](#path) | string | yes | - |
+| [saveMode](#saveMode) | string | no | append |
+| [target-file-size-bytes](#target-file-size-bytes) | long | no | - |
+| [check-nullability](#check-nullability) | bool | no | - |
+| [snapshot-property.custom-key](#snapshot-property.custom-key)| string | no | - |
+| [fanout-enabled](#fanout-enabled) | bool | no | - |
+| [check-ordering](#check-ordering) | bool | no | - |
+
+
+Refer to [iceberg write options](https://iceberg.apache.org/docs/latest/spark-configuration/) for more configurations.
+
+### path
+
+Iceberg table location.
+
+### saveMode
+
+append or overwrite. Only these two modes are supported by iceberg. The default value is append.
+
+### target-file-size-bytes
+
+Overrides this table’s write.target-file-size-bytes
+
+### check-nullability
+
+Sets the nullable check on fields
+
+### snapshot-property.custom-key
+
+Adds an entry with custom-key and corresponding value in the snapshot summary
+eg: snapshot-property.aaaa="bbbb"
+
+### fanout-enabled
+
+Overrides this table’s write.spark.fanout.enabled
+
+### check-ordering
+
+Checks if input schema and table schema are same
+
+## Example
+
+```bash
+iceberg {
+ path = "hdfs://localhost:9000/iceberg/warehouse/db/table"
+ }
+```
+
+
diff --git a/pom.xml b/pom.xml
index 23651bc..70a221c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -510,6 +510,7 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime</artifactId>
<version>${iceberg.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
new file mode 100644
index 0000000..0e673b2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSink
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.seatunnel.spark.sink.Iceberg
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
similarity index 59%
copy from seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
copy to seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
index 53134d5..438ef9d 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/sink/Iceberg.scala
@@ -14,37 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.source
+package org.apache.seatunnel.spark.sink
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType
+import org.apache.seatunnel.shade.com.typesafe.config.{ConfigFactory, ConfigValueType}
import org.apache.seatunnel.spark.SparkEnvironment
-import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.sql.{Dataset, Row}
import scala.collection.JavaConversions._
-class Iceberg extends SparkBatchSource {
- override def getData(env: SparkEnvironment): Dataset[Row] = {
- val reader = env.getSparkSession.read.format("iceberg")
+class Iceberg extends SparkBatchSink {
+
+ override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
+ val writer = data.write.format("iceberg")
for (e <- config.entrySet()) {
e.getValue.valueType match {
case ConfigValueType.NUMBER =>
- reader.option(e.getKey, Long.unbox(e.getValue.unwrapped()))
+ writer.option(e.getKey, config.getLong(e.getKey))
case ConfigValueType.BOOLEAN =>
- reader.option(e.getKey, Boolean.unbox(e.getValue.unwrapped()))
- case ConfigValueType.STRING => reader.option(e.getKey, e.getValue.unwrapped().toString)
+ writer.option(e.getKey, config.getBoolean(e.getKey))
+ case ConfigValueType.STRING =>
+ writer.option(e.getKey, config.getString(e.getKey))
}
}
- val df = reader.load(config.getString("path"))
- df.createOrReplaceTempView(config.getString("result_table_name"))
- env.getSparkSession.sql(config.getString("pre_sql"))
+ writer.mode(config.getString("saveMode"))
+ .save(config.getString("path"))
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "path", "pre_sql")
+ checkAllExists(config, "path")
}
- override def prepare(prepareEnv: SparkEnvironment): Unit = {}
+ override def prepare(prepareEnv: SparkEnvironment): Unit = {
+ val defaultConfig = ConfigFactory.parseMap(
+ Map(
+ "saveMode" -> "append"))
+ config = config.withFallback(defaultConfig)
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
index 53134d5..e5a60a7 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
@@ -31,10 +31,11 @@ class Iceberg extends SparkBatchSource {
for (e <- config.entrySet()) {
e.getValue.valueType match {
case ConfigValueType.NUMBER =>
- reader.option(e.getKey, Long.unbox(e.getValue.unwrapped()))
+ reader.option(e.getKey, config.getLong(e.getKey))
case ConfigValueType.BOOLEAN =>
- reader.option(e.getKey, Boolean.unbox(e.getValue.unwrapped()))
- case ConfigValueType.STRING => reader.option(e.getKey, e.getValue.unwrapped().toString)
+ reader.option(e.getKey, config.getBoolean(e.getKey))
+ case ConfigValueType.STRING =>
+ reader.option(e.getKey, config.getString(e.getKey))
}
}
val df = reader.load(config.getString("path"))
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index c2dc02c..23a9f9a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -604,7 +604,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-bundled-guava:0.13.1 - https://iceberg.apache.org)
(The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-common:0.13.1 - https://iceberg.apache.org)
(The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-core:0.13.1 - https://iceberg.apache.org)
- (The Apache Software License, Version 2.0) Apache Iceberg (org.apache.iceberg:iceberg-spark-runtime:0.13.1 - https://iceberg.apache.org)
(The Apache Software License, Version 2.0) Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
(The Apache Software License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:2.0.0 - http://kafka.apache.org)
(The Apache Software License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:2.4.1 - https://kafka.apache.org)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 3608017..75a7f53 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -657,7 +657,6 @@ iceberg-core-0.13.1.jar
iceberg-api-0.13.1.jar
iceberg-common-0.13.1.jar
iceberg-bundled-guava-0.13.1.jar
-iceberg-spark-runtime-0.13.1.jar
caffeine-2.8.4.jar
checker-qual-3.4.0.jar
error_prone_annotations-2.3.4.jar