You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/06 06:48:22 UTC

[hudi] branch asf-site updated: [HUDI-4786][DOCS] Add Flink DataStream API demo in Flink Guide (#6582)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 93c8bfd7e9 [HUDI-4786][DOCS] Add Flink DataStream API demo in Flink Guide (#6582)
93c8bfd7e9 is described below

commit 93c8bfd7e9af0d595042314fcbeba42d5c4660e4
Author: superche <73...@users.noreply.github.com>
AuthorDate: Tue Sep 6 14:48:10 2022 +0800

    [HUDI-4786][DOCS] Add Flink DataStream API demo in Flink Guide (#6582)
    
    Co-authored-by: superche <su...@tencent.com>
---
 website/docs/flink-quick-start-guide.md            | 151 ++++++++++++++++++++-
 .../version-0.12.0/flink-quick-start-guide.md      | 149 ++++++++++++++++++++
 2 files changed, 299 insertions(+), 1 deletion(-)

diff --git a/website/docs/flink-quick-start-guide.md b/website/docs/flink-quick-start-guide.md
index f4b9668178..5d1c9784ac 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -3,6 +3,8 @@ title: "Flink Guide"
 toc: true
 last_modified_at: 2020-08-12T15:19:57+08:00
 ---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
 
 This page introduces Flink-Hudi integration. We can feel the unique charm of how Flink brings in the power of streaming into Hudi.
 This guide helps you quickly start using Flink on Hudi, and learn different modes for reading/writing Hudi by Flink:
@@ -18,13 +20,22 @@ This guide helps you quickly start using Flink on Hudi, and learn different mode
 ## Quick Start
 
 ### Setup
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
 
 We use the [Flink Sql Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/) because it's a good
 quick start tool for SQL users.
 
 #### Step.1 download Flink jar
 
-Hudi works with both Flink 1.13, Flink 1.14, Flink 1.15. You can follow the
+Hudi works with both Flink 1.13, Flink 1.14 and Flink 1.15. You can follow the
 instructions [here](https://flink.apache.org/downloads) for setting up Flink. Then choose the desired Hudi-Flink bundle
 jar to work with different Flink and Scala versions:
 
@@ -74,9 +85,56 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 
 Setup table name, base path and operate using SQL for this guide.
 The SQL CLI only executes the SQL line by line.
+</TabItem>
+
+<TabItem value="dataStream">
+
+Hudi works with Flink 1.13, Flink 1.14 and Flink 1.15. Please add the desired
+dependency to your project:
+```xml
+<!-- Flink 1.13 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.13-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+```xml
+<!-- Flink 1.14 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.14-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+```xml
+<!-- Flink 1.15 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.15-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+</TabItem>
+
+</Tabs
+>
 
 ### Insert Data
 
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
 Creates a Flink Hudi table first and insert data into the Hudi table using SQL `VALUES` as below.
 
 ```sql
@@ -108,13 +166,104 @@ INSERT INTO t1 VALUES
   ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
   ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
 ```
+</TabItem>
+
+<TabItem value="dataStream">
+
+Creates a Flink Hudi table first and insert data into the Hudi table using DataStream API as below.
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.HoodiePipeline;
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+String targetTable = "t1";
+String basePath = "file:///tmp/t1";
+
+Map<String, String> options = new HashMap<>();
+options.put(FlinkOptions.PATH.key(), basePath);
+options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
+
+DataStream<RowData> dataStream = env.addSource(...);
+HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
+    .column("uuid VARCHAR(20)")
+    .column("name VARCHAR(10)")
+    .column("age INT")
+    .column("ts TIMESTAMP(3)")
+    .column("`partition` VARCHAR(20)")
+    .pk("uuid")
+    .partition("partition")
+    .options(options);
+
+builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
+env.execute("Api_Sink");
+```
+</TabItem>
+
+</Tabs
+>
 
 ### Query Data
 
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
 ```sql
 -- query from the Hudi table
 select * from t1;
 ```
+</TabItem>
+
+<TabItem value="dataStream">
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.HoodiePipeline;
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+String targetTable = "t1";
+String basePath = "file:///tmp/t1";
+
+Map<String, String> options = new HashMap<>();
+options.put(FlinkOptions.PATH.key(), basePath);
+options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
+options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); // specifies the start commit instant time
+    
+HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
+    .column("uuid VARCHAR(20)")
+    .column("name VARCHAR(10)")
+    .column("age INT")
+    .column("ts TIMESTAMP(3)")
+    .column("`partition` VARCHAR(20)")
+    .pk("uuid")
+    .partition("partition")
+    .options(options);
+
+DataStream<RowData> rowDataDataStream = builder.source(env);
+rowDataDataStream.print();
+env.execute("Api_Source");
+```
+</TabItem>
+
+</Tabs
+>
 
 This statement queries snapshot view of the dataset. 
 Refers to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.
diff --git a/website/versioned_docs/version-0.12.0/flink-quick-start-guide.md b/website/versioned_docs/version-0.12.0/flink-quick-start-guide.md
index 4a926aad04..2f33027cf6 100644
--- a/website/versioned_docs/version-0.12.0/flink-quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.0/flink-quick-start-guide.md
@@ -3,6 +3,8 @@ title: "Flink Guide"
 toc: true
 last_modified_at: 2020-08-12T15:19:57+08:00
 ---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
 
 This page introduces Flink-Hudi integration. We can feel the unique charm of how Flink brings in the power of streaming into Hudi.
 This guide helps you quickly start using Flink on Hudi, and learn different modes for reading/writing Hudi by Flink:
@@ -18,6 +20,15 @@ This guide helps you quickly start using Flink on Hudi, and learn different mode
 ## Quick Start
 
 ### Setup
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
 
 We use the [Flink Sql Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/) because it's a good
 quick start tool for SQL users.
@@ -74,9 +85,56 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 
 Setup table name, base path and operate using SQL for this guide.
 The SQL CLI only executes the SQL line by line.
+</TabItem>
+
+<TabItem value="dataStream">
+
+Hudi works with Flink 1.13, Flink 1.14 and Flink 1.15. Please add the desired 
+dependency to your project:
+```xml
+<!-- Flink 1.13 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.13-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+```xml
+<!-- Flink 1.14 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.14-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+```xml
+<!-- Flink 1.15 -->
+<dependency>
+    <groupId>org.apache.hudi</groupId>
+    <artifactId>hudi-flink1.15-bundle</artifactId>
+    <version>0.12.0</version>
+</dependency>
+```
+
+</TabItem>
+
+</Tabs
+>
 
 ### Insert Data
 
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
 Creates a Flink Hudi table first and insert data into the Hudi table using SQL `VALUES` as below.
 
 ```sql
@@ -108,13 +166,104 @@ INSERT INTO t1 VALUES
   ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
   ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
 ```
+</TabItem>
+
+<TabItem value="dataStream">
+
+Creates a Flink Hudi table first and insert data into the Hudi table using DataStream API as below.
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.HoodiePipeline;
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+String targetTable = "t1";
+String basePath = "file:///tmp/t1";
+
+Map<String, String> options = new HashMap<>();
+options.put(FlinkOptions.PATH.key(), basePath);
+options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
+
+DataStream<RowData> dataStream = env.addSource(...);
+HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
+    .column("uuid VARCHAR(20)")
+    .column("name VARCHAR(10)")
+    .column("age INT")
+    .column("ts TIMESTAMP(3)")
+    .column("`partition` VARCHAR(20)")
+    .pk("uuid")
+    .partition("partition")
+    .options(options);
+
+builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
+env.execute("Api_Sink");
+```
+</TabItem>
+
+</Tabs
+>
 
 ### Query Data
 
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
 ```sql
 -- query from the Hudi table
 select * from t1;
 ```
+</TabItem>
+
+<TabItem value="dataStream">
+
+```java
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.HoodiePipeline;
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+String targetTable = "t1";
+String basePath = "file:///tmp/t1";
+
+Map<String, String> options = new HashMap<>();
+options.put(FlinkOptions.PATH.key(), basePath);
+options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
+options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); // specifies the start commit instant time
+    
+HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
+    .column("uuid VARCHAR(20)")
+    .column("name VARCHAR(10)")
+    .column("age INT")
+    .column("ts TIMESTAMP(3)")
+    .column("`partition` VARCHAR(20)")
+    .pk("uuid")
+    .partition("partition")
+    .options(options);
+
+DataStream<RowData> rowDataDataStream = builder.source(env);
+rowDataDataStream.print();
+env.execute("Api_Source");
+```
+</TabItem>
+
+</Tabs
+>
 
 This statement queries snapshot view of the dataset.
 Refers to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported.