You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/02 07:18:30 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connectors] Add connector-spark-iceberg source (#1351)

This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 631db15  [Feature][Connectors] Add connector-spark-iceberg source (#1351)
631db15 is described below

commit 631db15fef26ad9c28d1f18fff1c1f1ce00066d0
Author: ououtt <73...@qq.com>
AuthorDate: Wed Mar 2 15:18:24 2022 +0800

    [Feature][Connectors] Add connector-spark-iceberg source (#1351)
    
    * iceberg spark batch source
    
    * fix license about iceberg dependency
    
    * read option; add iceberg.md file
    
    * add common options
    
    Co-authored-by: eye <ey...@aloudata.com>
---
 .../spark/configuration/source-plugins/Iceberg.md  | 52 ++++++++++++++++++
 pom.xml                                            | 12 +++++
 seatunnel-connectors/pom.xml                       |  1 +
 .../seatunnel-connector-spark-iceberg/pom.xml      | 61 ++++++++++++++++++++++
 .../org.apache.seatunnel.spark.BaseSparkSource     | 19 +++++++
 .../apache/seatunnel/spark/source/Iceberg.scala    | 50 ++++++++++++++++++
 seatunnel-core/seatunnel-core-spark/pom.xml        |  6 +++
 seatunnel-dist/release-docs/LICENSE                | 12 +++++
 seatunnel-dist/release-docs/NOTICE                 | 30 +++++++++++
 tools/dependencies/known-dependencies.txt          | 12 +++++
 10 files changed, 255 insertions(+)

diff --git a/docs/en/spark/configuration/source-plugins/Iceberg.md b/docs/en/spark/configuration/source-plugins/Iceberg.md
new file mode 100644
index 0000000..6087d98
--- /dev/null
+++ b/docs/en/spark/configuration/source-plugins/Iceberg.md
@@ -0,0 +1,52 @@
+# Iceberg
+
+> Source plugin: Iceberg [Spark]
+
+## Description
+
+Read data from Iceberg.
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| common-options |        | yes      | -             |
+| [path](#path)  | string | yes      | -             |
+| [pre_sql](#pre_sql) | string | yes | -             |
+| [snapshot-id](#snapshot-id) | long | no      | -   |
+| [as-of-timestamp](#as-of-timestamp) | long | no| - |
+
+
+Refer to [iceberg read options](https://iceberg.apache.org/docs/latest/spark-configuration/) for more configurations.
+
+### common-options
+
+Source plugin common parameters, please refer to [Source Plugin](./source-plugin.md) for details
+
+### path
+
+Iceberg table location.
+
+### pre_sql
+
+SQL statements queried from iceberg table. Note that the table name is `result_table_name` configuration
+
+### snapshot-id
+
+Snapshot ID of the table snapshot to read
+
+### as-of-timestamp
+
+A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
+
+## Example
+
+```bash
+iceberg {
+    path = "hdfs://localhost:9000/iceberg/warehouse/db/table"
+    result_table_name = "my_source"
+    pre_sql="select * from my_source where dt = '2019-01-01'"
+}
+```
+
+
diff --git a/pom.xml b/pom.xml
index 0048676..d4ae949 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,7 @@
         <spark.version>2.4.0</spark.version>
         <spark.binary.version>2.4</spark.binary.version>
         <neo4j.connector.spark.version>4.1.0</neo4j.connector.spark.version>
+        <iceberg.version>0.13.1</iceberg.version>
         <flink.version>1.13.6</flink.version>
         <hudi.version>0.10.0</hudi.version>
         <hadoop.binary.version>2.7</hadoop.binary.version>
@@ -502,6 +503,17 @@
                 <artifactId>influxdb-java</artifactId>
                 <version>${influxdb-java.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-core</artifactId>
+                <version>${iceberg.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-spark-runtime</artifactId>
+                <version>${iceberg.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/pom.xml
index 635d06e..c083f5d 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/pom.xml
@@ -50,6 +50,7 @@
         <module>seatunnel-connector-spark-email</module>
         <module>seatunnel-connector-spark-tidb</module>
         <module>seatunnel-connector-spark-neo4j</module>
+        <module>seatunnel-connector-spark-iceberg</module>
         <module>seatunnel-connector-flink-console</module>
         <module>seatunnel-connector-flink-druid</module>
         <module>seatunnel-connector-flink-elasticsearch</module>
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/pom.xml b/seatunnel-connectors/seatunnel-connector-spark-iceberg/pom.xml
new file mode 100644
index 0000000..5b15df0
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-connectors</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-connector-spark-iceberg</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-spark-runtime</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 0000000..6212838
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.seatunnel.spark.source.Iceberg
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
new file mode 100644
index 0000000..53134d5
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/source/Iceberg.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.source
+
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.spark.sql.{Dataset, Row}
+
+import scala.collection.JavaConversions._
+
+class Iceberg extends SparkBatchSource {
+  override def getData(env: SparkEnvironment): Dataset[Row] = {
+    val reader = env.getSparkSession.read.format("iceberg")
+    for (e <- config.entrySet()) {
+      e.getValue.valueType match {
+        case ConfigValueType.NUMBER =>
+          reader.option(e.getKey, Long.unbox(e.getValue.unwrapped()))
+        case ConfigValueType.BOOLEAN =>
+          reader.option(e.getKey, Boolean.unbox(e.getValue.unwrapped()))
+        case ConfigValueType.STRING => reader.option(e.getKey, e.getValue.unwrapped().toString)
+      }
+    }
+    val df = reader.load(config.getString("path"))
+    df.createOrReplaceTempView(config.getString("result_table_name"))
+    env.getSparkSession.sql(config.getString("pre_sql"))
+  }
+
+  override def checkConfig(): CheckResult = {
+    checkAllExists(config, "path", "pre_sql")
+  }
+
+  override def prepare(prepareEnv: SparkEnvironment): Unit = {}
+}
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index e33ab09..c2e73ca 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -162,6 +162,12 @@
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-spark-iceberg</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-transform-spark-split</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index e07c7bc..e330a33 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -741,6 +741,17 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Phoenix - Spark (org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0 - http://www.apache.org/phoenix/phoenix-spark/)
      (http://asm.ow2.org/license.html) (http://www.apache.org/licenses/LICENSE-2.0.txt) Apache XBean :: ASM 6 shaded (repackaged) (org.apache.xbean:xbean-asm6-shaded:4.8 - http://geronimo.apache.org/maven/xbean/4.8/xbean-asm6-shaded)
      (Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
+     (Apache License, Version 2.0) Apache Iceberg - Core (org.apache.iceberg:iceberg-core:0.13.1 - https://iceberg.apache.org/iceberg-core)
+     (Apache License, Version 2.0) Apache Iceberg - Api (org.apache.iceberg:iceberg-api:0.13.1 - https://iceberg.apache.org/iceberg-api)
+     (Apache License, Version 2.0) Apache Iceberg - Common (org.apache.iceberg:iceberg-common:0.13.1 - https://iceberg.apache.org/iceberg-common)
+     (Apache License, Version 2.0) Apache Iceberg - Bundled Guava (org.apache.iceberg:iceberg-bundled-guava:0.13.1 - https://iceberg.apache.org/iceberg-bundled-guava)
+     (Apache License, Version 2.0) Apache Iceberg - Spark Runtime (org.apache.iceberg:iceberg-spark-runtime:0.13.1 - https://iceberg.apache.org/iceberg-spark-time)
+     (Apache License, Version 2.0) Caffeine cache (com.github.ben-manes.caffeine:caffeine:2.8.4 - https://github.com/ben-manes/caffeine)
+     (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.3.4 - https://errorprone.info/error_prone_annotations)
+     (Apache 2) org.roaringbitmap:RoaringBitmap (org.roaringbitmap:RoaringBitmap:0.9.22 - https://github.com/RoaringBitmap/RoaringBitmap)
+     (Apache 2) org.roaringbitmap:shims (org.roaringbitmap:shims:0.9.22 - https://github.com/RoaringBitmap/RoaringBitmap)
+     (Apache 2) org.roaringbitmap:RoaringBitmap (org.roaringbitmap:RoaringBitmap:0.5.11 - https://github.com/RoaringBitmap/RoaringBitmap)
+     (The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.11.4 - https://github.com/FasterXML/jackson-core)
 
 
 ========================================================================
@@ -768,6 +779,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.16 - http://www.slf4j.org)
      (MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.25 - http://www.slf4j.org)
      (MIT-License) spoiwo (com.norbitltd:spoiwo_2.11:1.8.0 - https://github.com/norbert-radyk/spoiwo/)
+     (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
 
 ========================================================================
 BSD licenses
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index 1bdaaf4..ba9163c 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4121,5 +4121,35 @@ Apache XBean NOTICE
 =========================================================================
 
 
+Apache Iceberg NOTICE
+
+=========================================================================
+Apache Iceberg
+Copyright 2017-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This project includes code from Kite, developed at Cloudera, Inc. with
+the following copyright notice:
+
+| Copyright 2013 Cloudera Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+|   http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+=========================================================================
+
+
 
 =========================================================================
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 7761a8f..e8f8c74 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -652,3 +652,15 @@ zookeeper-3.5.9.jar
 zookeeper-jute-3.5.9.jar
 zstd-jni-1.3.3-1.jar
 zstd-jni-1.4.3-1.jar
+iceberg-core-0.13.1.jar
+iceberg-api-0.13.1.jar
+iceberg-common-0.13.1.jar
+iceberg-bundled-guava-0.13.1.jar
+iceberg-spark-runtime-0.13.1.jar
+caffeine-2.8.4.jar
+checker-qual-3.4.0.jar
+error_prone_annotations-2.3.4.jar
+RoaringBitmap-0.9.22.jar
+RoaringBitmap-0.5.11.jar
+jackson-core-2.11.4.jar
+shims-0.9.22.jar