You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/23 04:59:46 UTC

[iotdb] branch master updated: [IOTDB-1214] Add Flink-IoTDB documents to the website (#2813)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ec70703  [IOTDB-1214] Add Flink-IoTDB documents to the website (#2813)
ec70703 is described below

commit ec7070330969cec8d53d3cbf7cf437da35a5e59d
Author: Sunny-Island <41...@users.noreply.github.com>
AuthorDate: Tue Mar 23 12:59:24 2021 +0800

    [IOTDB-1214] Add Flink-IoTDB documents to the website (#2813)
    
    Co-authored-by: Xiangdong Huang <hx...@apache.org>
---
 .../UserGuide/Ecosystem Integration/Flink IoTDB.md | 124 ++++++++++++++
 .../Ecosystem Integration/Flink TsFile.md          | 180 +++++++++++++++++++++
 .../UserGuide/Ecosystem Integration/Flink IoTDB.md | 122 ++++++++++++++
 .../Ecosystem Integration/Flink Tsfile.md          | 179 ++++++++++++++++++++
 site/src/main/.vuepress/config.js                  |   8 +-
 5 files changed, 611 insertions(+), 2 deletions(-)

diff --git a/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md b/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md
new file mode 100644
index 0000000..b0c4954
--- /dev/null
+++ b/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md	
@@ -0,0 +1,124 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## IoTDB-Flink-Connector 
+
+IoTDB integration for [Apache Flink](https://flink.apache.org/). This module includes the iotdb sink that allows a flink job to write events into timeseries.
+
+### IoTDBSink
+
+To use the `IoTDBSink`,  you need construct an instance of it by specifying `IoTDBOptions` and `IoTSerializationSchema` instances.
+The `IoTDBSink` send only one event after another by default, but you can change to batch by invoking `withBatchSize(int)`. 
+
+### Example
+
+This example shows a case that sends data to a IoTDB server from a Flink job:
+
+- A simulated Source `SensorSource` generates data points per 1 second.
+- Flink uses `IoTDBSink` to consume the generated data points and write the data into IoTDB.
+
+```java
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class FlinkIoTDBSink {
+  public static void main(String[] args) throws Exception {
+    // run the flink job on local mini cluster
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    IoTDBOptions options = new IoTDBOptions();
+    options.setHost("127.0.0.1");
+    options.setPort(6667);
+    options.setUser("root");
+    options.setPassword("root");
+    options.setStorageGroup("root.sg");
+
+    // If the server enables auto_create_schema, then we do not need to register all timeseries
+    // here.
+    options.setTimeseriesOptionList(
+        Lists.newArrayList(
+            new IoTDBOptions.TimeseriesOption(
+                "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
+
+    IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
+    IoTDBSink ioTDBSink =
+        new IoTDBSink(options, serializationSchema)
+            // enable batching
+            .withBatchSize(10)
+            // how many connectons to the server will be created for each parallelism
+            .withSessionPoolSize(3);
+
+    env.addSource(new SensorSource())
+        .name("sensor-source")
+        .setParallelism(1)
+        .addSink(ioTDBSink)
+        .name("iotdb-sink");
+
+    env.execute("iotdb-flink-example");
+  }
+
+  private static class SensorSource implements SourceFunction<Map<String, String>> {
+    boolean running = true;
+    Random random = new SecureRandom();
+
+    @Override
+    public void run(SourceContext context) throws Exception {
+      while (running) {
+        Map<String, String> tuple = new HashMap();
+        tuple.put("device", "root.sg.d1");
+        tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
+        tuple.put("measurements", "s1");
+        tuple.put("types", "DOUBLE");
+        tuple.put("values", String.valueOf(random.nextDouble()));
+
+        context.collect(tuple);
+        Thread.sleep(1000);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      running = false;
+    }
+  }
+}
+
+```
+
+
+
+
+
+
+### Usage
+
+* Launch the IoTDB server.
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to run the flink job on local mini cluster.
diff --git a/docs/UserGuide/Ecosystem Integration/Flink TsFile.md b/docs/UserGuide/Ecosystem Integration/Flink TsFile.md
new file mode 100644
index 0000000..dfa5e89
--- /dev/null
+++ b/docs/UserGuide/Ecosystem Integration/Flink TsFile.md	
@@ -0,0 +1,180 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## TsFile-Flink-Connector
+
+### About TsFile-Flink-Connector
+
+TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type. 
+This enables users to read and write  Tsfile by Flink via DataStream/DataSet API.
+
+With this connector, you can
+
+* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
+* load all files in a specific directory, from either the local file system or hdfs, into Flink
+
+### Quick Start
+
+#### TsFileInputFormat Example
+
+1. create TsFileInputFormat with default RowRowRecordParser.
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+	.map(Path::new)
+	.collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. Read data from the input format and print to stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+	System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+	System.out.println(s);
+}
+```
+
+#### Example of TSRecordOutputFormat 
+
+1. create TSRecordOutputFormat with default RowTSRecordConverter.
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG
+};
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+Schema schema = new Schema();
+schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
+RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
+TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
+```
+
+2. write data via the output format:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+senv.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
+DataStream<Tuple7> source = senv.fromCollection(
+	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+	Row row = new Row(7);
+	for (int i = 0; i < 7; i++) {
+		row.setField(i, t.getField(i));
+	}
+	return row;
+}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
+senv.execute();
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+DataSet<Tuple7> source = env.fromCollection(
+	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+	Row row = new Row(7);
+	for (int i = 0; i < 7; i++) {
+		row.setField(i, t.getField(i));
+	}
+	return row;
+}).returns(rowTypeInfo).write(outputFormat, path);
+env.execute();
+```
+
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md b/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md
new file mode 100644
index 0000000..b739325
--- /dev/null
+++ b/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md	
@@ -0,0 +1,122 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## Flink IoTDB 连接器
+
+IoTDB 与 [Apache Flink](https://flink.apache.org/) 的集成. 此模块包含了 iotdb sink,允许 flink job 将时序数据写入IoTDB。
+
+### IoTDBSink
+
+使用 `IoTDBSink` ,您需要定义一个 `IoTDBOptions` 和一个 `IoTSerializationSchema` 实例。 `IoTDBSink` 默认每次发送一个数据,可以通过调用 `withBatchSize(int)` 进行调整。
+
+### 示例
+
+该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:
+
+- 一个模拟的 Source  `SensorSource` 每秒钟产生一个数据点。
+
+- Flink使用 `IoTDBSink` 消费产生的数据并写入 IoTDB 。
+
+  ```java
+  import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+  import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+  
+  import com.google.common.collect.Lists;
+  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+  import org.apache.flink.streaming.api.functions.source.SourceFunction;
+  
+  import java.security.SecureRandom;
+  import java.util.HashMap;
+  import java.util.Map;
+  import java.util.Random;
+  
+  public class FlinkIoTDBSink {
+    public static void main(String[] args) throws Exception {
+      // run the flink job on local mini cluster
+      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+  
+      IoTDBOptions options = new IoTDBOptions();
+      options.setHost("127.0.0.1");
+      options.setPort(6667);
+      options.setUser("root");
+      options.setPassword("root");
+      options.setStorageGroup("root.sg");
+  
+      // If the server enables auto_create_schema, then we do not need to register all timeseries
+      // here.
+      options.setTimeseriesOptionList(
+          Lists.newArrayList(
+              new IoTDBOptions.TimeseriesOption(
+                  "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
+  
+      IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
+      IoTDBSink ioTDBSink =
+          new IoTDBSink(options, serializationSchema)
+              // enable batching
+              .withBatchSize(10)
+              // how many connectons to the server will be created for each parallelism
+              .withSessionPoolSize(3);
+  
+      env.addSource(new SensorSource())
+          .name("sensor-source")
+          .setParallelism(1)
+          .addSink(ioTDBSink)
+          .name("iotdb-sink");
+  
+      env.execute("iotdb-flink-example");
+    }
+  
+    private static class SensorSource implements SourceFunction<Map<String, String>> {
+      boolean running = true;
+      Random random = new SecureRandom();
+  
+      @Override
+      public void run(SourceContext context) throws Exception {
+        while (running) {
+          Map<String, String> tuple = new HashMap();
+          tuple.put("device", "root.sg.d1");
+          tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
+          tuple.put("measurements", "s1");
+          tuple.put("types", "DOUBLE");
+          tuple.put("values", String.valueOf(random.nextDouble()));
+  
+          context.collect(tuple);
+          Thread.sleep(1000);
+        }
+      }
+  
+      @Override
+      public void cancel() {
+        running = false;
+      }
+    }
+  }
+  
+  ```
+
+  
+
+
+### 运行方法
+
+* 启动 IoTDB server
+* 运行 `org.apache.iotdb.flink.FlinkIoTDBSink.java` 将 Flink job 运行在本地的集群上。
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md b/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md
new file mode 100644
index 0000000..477a7ad
--- /dev/null
+++ b/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md	
@@ -0,0 +1,179 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## TsFile-Flink 连接器
+
+###  关于 TsFile-Flink 连接器
+
+TsFile-Flink-Connector 对Tsfile类型的外部数据源实现 Flink 的支持。 这使用户可以通过 Flink DataStream/DataSet 进行读取,写入和查询。
+
+使用此连接器,您可以
+
+* 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以DataSet的形式)到 Flink 。
+* 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。
+
+###  快速开始
+
+#### TsFileInputFormat 示例
+
+1. 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+	.map(Path::new)
+	.collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. 从输入格式读取数据并打印到标准输出 stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+	System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+	System.out.println(s);
+}
+```
+
+#### TSRecordOutputFormat 示例
+
+1. 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG,
+	Types.LONG
+};
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+Schema schema = new Schema();
+schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
+RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
+TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
+```
+
+2. 通过输出格式写数据:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+senv.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
+DataStream<Tuple7> source = senv.fromCollection(
+	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+	Row row = new Row(7);
+	for (int i = 0; i < 7; i++) {
+		row.setField(i, t.getField(i));
+	}
+	return row;
+}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
+senv.execute();
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+DataSet<Tuple7> source = env.fromCollection(
+	data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+	Row row = new Row(7);
+	for (int i = 0; i < 7; i++) {
+		row.setField(i, t.getField(i));
+	}
+	return row;
+}).returns(rowTypeInfo).write(outputFormat, path);
+env.execute();
+```
+
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 8f8358d..01fffff 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -585,7 +585,9 @@ var config = {
 							['Ecosystem Integration/Spark TsFile','Spark TsFile'],
 							['Ecosystem Integration/Spark IoTDB','Spark IoTDB'],
 							['Ecosystem Integration/Hive TsFile','Hive TsFile'],
-							['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB']
+							['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB'],
+							['Ecosystem Integration/Flink IoTDB','Flink IoTDB'],
+							['Ecosystem Integration/Flink Tsfile','Flink Tsfile']
 						]
 					},
 					{
@@ -1210,7 +1212,9 @@ var config = {
 							['Ecosystem Integration/Spark TsFile','Spark TsFile'],
 							['Ecosystem Integration/Spark IoTDB','Spark IoTDB'],
 							['Ecosystem Integration/Hive TsFile','Hive TsFile'],
-							['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB']
+							['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB'],
+							['Ecosystem Integration/Flink IoTDB','Flink IoTDB'],
+							['Ecosystem Integration/Flink Tsfile','Flink Tsfile']
 						]
 					},
 					{