You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/23 04:59:46 UTC
[iotdb] branch master updated: [IOTDB-1214] Add Flink-IoTDB
documents to the website (#2813)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ec70703 [IOTDB-1214] Add Flink-IoTDB documents to the website (#2813)
ec70703 is described below
commit ec7070330969cec8d53d3cbf7cf437da35a5e59d
Author: Sunny-Island <41...@users.noreply.github.com>
AuthorDate: Tue Mar 23 12:59:24 2021 +0800
[IOTDB-1214] Add Flink-IoTDB documents to the website (#2813)
Co-authored-by: Xiangdong Huang <hx...@apache.org>
---
.../UserGuide/Ecosystem Integration/Flink IoTDB.md | 124 ++++++++++++++
.../Ecosystem Integration/Flink TsFile.md | 180 +++++++++++++++++++++
.../UserGuide/Ecosystem Integration/Flink IoTDB.md | 122 ++++++++++++++
.../Ecosystem Integration/Flink Tsfile.md | 179 ++++++++++++++++++++
site/src/main/.vuepress/config.js | 8 +-
5 files changed, 611 insertions(+), 2 deletions(-)
diff --git a/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md b/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md
new file mode 100644
index 0000000..b0c4954
--- /dev/null
+++ b/docs/UserGuide/Ecosystem Integration/Flink IoTDB.md
@@ -0,0 +1,124 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## IoTDB-Flink-Connector
+
+IoTDB integration for [Apache Flink](https://flink.apache.org/). This module includes the iotdb sink that allows a flink job to write events into timeseries.
+
+### IoTDBSink
+
+To use the `IoTDBSink`, you need construct an instance of it by specifying `IoTDBOptions` and `IoTSerializationSchema` instances.
+The `IoTDBSink` send only one event after another by default, but you can change to batch by invoking `withBatchSize(int)`.
+
+### Example
+
+This example shows a case that sends data to a IoTDB server from a Flink job:
+
+- A simulated Source `SensorSource` generates data points per 1 second.
+- Flink uses `IoTDBSink` to consume the generated data points and write the data into IoTDB.
+
+```java
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class FlinkIoTDBSink {
+ public static void main(String[] args) throws Exception {
+ // run the flink job on local mini cluster
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IoTDBOptions options = new IoTDBOptions();
+ options.setHost("127.0.0.1");
+ options.setPort(6667);
+ options.setUser("root");
+ options.setPassword("root");
+ options.setStorageGroup("root.sg");
+
+ // If the server enables auto_create_schema, then we do not need to register all timeseries
+ // here.
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(
+ new IoTDBOptions.TimeseriesOption(
+ "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
+
+ IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
+ IoTDBSink ioTDBSink =
+ new IoTDBSink(options, serializationSchema)
+ // enable batching
+ .withBatchSize(10)
+ // how many connectons to the server will be created for each parallelism
+ .withSessionPoolSize(3);
+
+ env.addSource(new SensorSource())
+ .name("sensor-source")
+ .setParallelism(1)
+ .addSink(ioTDBSink)
+ .name("iotdb-sink");
+
+ env.execute("iotdb-flink-example");
+ }
+
+ private static class SensorSource implements SourceFunction<Map<String, String>> {
+ boolean running = true;
+ Random random = new SecureRandom();
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ while (running) {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.d1");
+ tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
+ tuple.put("measurements", "s1");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", String.valueOf(random.nextDouble()));
+
+ context.collect(tuple);
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+}
+
+```
+
+
+
+
+
+
+### Usage
+
+* Launch the IoTDB server.
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to run the flink job on local mini cluster.
diff --git a/docs/UserGuide/Ecosystem Integration/Flink TsFile.md b/docs/UserGuide/Ecosystem Integration/Flink TsFile.md
new file mode 100644
index 0000000..dfa5e89
--- /dev/null
+++ b/docs/UserGuide/Ecosystem Integration/Flink TsFile.md
@@ -0,0 +1,180 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## TsFile-Flink-Connector
+
+### About TsFile-Flink-Connector
+
+TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type.
+This enables users to read and write Tsfile by Flink via DataStream/DataSet API.
+
+With this connector, you can
+
+* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
+* load all files in a specific directory, from either the local file system or hdfs, into Flink
+
+### Quick Start
+
+#### TsFileInputFormat Example
+
+1. create TsFileInputFormat with default RowRowRecordParser.
+
+```java
+String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. Read data from the input format and print to stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+ System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+ System.out.println(s);
+}
+```
+
+#### Example of TSRecordOutputFormat
+
+1. create TSRecordOutputFormat with default RowTSRecordConverter.
+
+```java
+String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG
+};
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+Schema schema = new Schema();
+schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
+RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
+TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
+```
+
+2. write data via the output format:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+senv.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
+DataStream<Tuple7> source = senv.fromCollection(
+ data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+ Row row = new Row(7);
+ for (int i = 0; i < 7; i++) {
+ row.setField(i, t.getField(i));
+ }
+ return row;
+}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
+senv.execute();
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+DataSet<Tuple7> source = env.fromCollection(
+ data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+ Row row = new Row(7);
+ for (int i = 0; i < 7; i++) {
+ row.setField(i, t.getField(i));
+ }
+ return row;
+}).returns(rowTypeInfo).write(outputFormat, path);
+env.execute();
+```
+
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md b/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md
new file mode 100644
index 0000000..b739325
--- /dev/null
+++ b/docs/zh/UserGuide/Ecosystem Integration/Flink IoTDB.md
@@ -0,0 +1,122 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## Flink IoTDB 连接器
+
+IoTDB 与 [Apache Flink](https://flink.apache.org/) 的集成. 此模块包含了 iotdb sink,允许 flink job 将时序数据写入IoTDB。
+
+### IoTDBSink
+
+使用 `IoTDBSink` ,您需要定义一个 `IoTDBOptions` 和一个 `IoTSerializationSchema` 实例。 `IoTDBSink` 默认每次发送一个数据,可以通过调用 `withBatchSize(int)` 进行调整。
+
+### 示例
+
+该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:
+
+- 一个模拟的 Source `SensorSource` 每秒钟产生一个数据点。
+
+- Flink使用 `IoTDBSink` 消费产生的数据并写入 IoTDB 。
+
+ ```java
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+ import com.google.common.collect.Lists;
+ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+ import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+ import java.security.SecureRandom;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.Random;
+
+ public class FlinkIoTDBSink {
+ public static void main(String[] args) throws Exception {
+ // run the flink job on local mini cluster
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ IoTDBOptions options = new IoTDBOptions();
+ options.setHost("127.0.0.1");
+ options.setPort(6667);
+ options.setUser("root");
+ options.setPassword("root");
+ options.setStorageGroup("root.sg");
+
+ // If the server enables auto_create_schema, then we do not need to register all timeseries
+ // here.
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(
+ new IoTDBOptions.TimeseriesOption(
+ "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
+
+ IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
+ IoTDBSink ioTDBSink =
+ new IoTDBSink(options, serializationSchema)
+ // enable batching
+ .withBatchSize(10)
+ // how many connectons to the server will be created for each parallelism
+ .withSessionPoolSize(3);
+
+ env.addSource(new SensorSource())
+ .name("sensor-source")
+ .setParallelism(1)
+ .addSink(ioTDBSink)
+ .name("iotdb-sink");
+
+ env.execute("iotdb-flink-example");
+ }
+
+ private static class SensorSource implements SourceFunction<Map<String, String>> {
+ boolean running = true;
+ Random random = new SecureRandom();
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ while (running) {
+ Map<String, String> tuple = new HashMap();
+ tuple.put("device", "root.sg.d1");
+ tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
+ tuple.put("measurements", "s1");
+ tuple.put("types", "DOUBLE");
+ tuple.put("values", String.valueOf(random.nextDouble()));
+
+ context.collect(tuple);
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+ }
+
+ ```
+
+
+
+
+### 运行方法
+
+* 启动 IoTDB server
+* 运行 `org.apache.iotdb.flink.FlinkIoTDBSink.java` 将 Flink job 运行在本地的集群上。
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md b/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md
new file mode 100644
index 0000000..477a7ad
--- /dev/null
+++ b/docs/zh/UserGuide/Ecosystem Integration/Flink Tsfile.md
@@ -0,0 +1,179 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## TsFile-Flink 连接器
+
+### 关于 TsFile-Flink 连接器
+
+TsFile-Flink-Connector 对Tsfile类型的外部数据源实现 Flink 的支持。 这使用户可以通过 Flink DataStream/DataSet 进行读取,写入和查询。
+
+使用此连接器,您可以
+
+* 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以DataSet的形式)到 Flink 。
+* 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。
+
+### 快速开始
+
+#### TsFileInputFormat 示例
+
+1. 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
+
+```java
+String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. 从输入格式读取数据并打印到标准输出 stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+ System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+ System.out.println(s);
+}
+```
+
+#### TSRecordOutputFormat 示例
+
+1. 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
+
+```java
+String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG
+};
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+Schema schema = new Schema();
+schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
+schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
+RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
+TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
+```
+
+2. 通过输出格式写数据:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+senv.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
+DataStream<Tuple7> source = senv.fromCollection(
+ data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+ Row row = new Row(7);
+ for (int i = 0; i < 7; i++) {
+ row.setField(i, t.getField(i));
+ }
+ return row;
+}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
+senv.execute();
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+List<Tuple7> data = new ArrayList<>(7);
+data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
+data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
+data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
+data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
+data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
+data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
+data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
+DataSet<Tuple7> source = env.fromCollection(
+ data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
+source.map(t -> {
+ Row row = new Row(7);
+ for (int i = 0; i < 7; i++) {
+ row.setField(i, t.getField(i));
+ }
+ return row;
+}).returns(rowTypeInfo).write(outputFormat, path);
+env.execute();
+```
+
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 8f8358d..01fffff 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -585,7 +585,9 @@ var config = {
['Ecosystem Integration/Spark TsFile','Spark TsFile'],
['Ecosystem Integration/Spark IoTDB','Spark IoTDB'],
['Ecosystem Integration/Hive TsFile','Hive TsFile'],
- ['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB']
+ ['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB'],
+ ['Ecosystem Integration/Flink IoTDB','Flink IoTDB'],
+ ['Ecosystem Integration/Flink Tsfile','Flink Tsfile']
]
},
{
@@ -1210,7 +1212,9 @@ var config = {
['Ecosystem Integration/Spark TsFile','Spark TsFile'],
['Ecosystem Integration/Spark IoTDB','Spark IoTDB'],
['Ecosystem Integration/Hive TsFile','Hive TsFile'],
- ['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB']
+ ['Ecosystem Integration/Zeppelin-IoTDB','Zeppelin-IoTDB'],
+ ['Ecosystem Integration/Flink IoTDB','Flink IoTDB'],
+ ['Ecosystem Integration/Flink Tsfile','Flink Tsfile']
]
},
{