You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 12:25:43 UTC

[incubator-iotdb] 01/06: add TsFileInputFormat for reading TsFiles in flink.

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0f782ad90b14ee0481d13e36cf144ead404c615c
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Thu Mar 26 17:35:54 2020 +0800

    add TsFileInputFormat for reading TsFiles in flink.
---
 example/flink/README.md                            |   7 +
 example/flink/pom.xml                              |  10 +
 .../apache/iotdb/flink/FlinkTsFileBatchSource.java |  77 +++++++
 .../iotdb/flink/FlinkTsFileStreamSource.java       |  81 +++++++
 .../main/java/org/apache/iotdb/flink/Utils.java    |  69 ++++++
 flink-tsfile/README.md                             |  93 ++++++++
 {example/flink => flink-tsfile}/pom.xml            |  20 +-
 .../apache/iotdb/flink/tsfile/RowRecordParser.java |  46 ++++
 .../iotdb/flink/tsfile/RowRowRecordParser.java     | 120 +++++++++++
 .../iotdb/flink/tsfile/TsFileInputFormat.java      | 168 +++++++++++++++
 .../iotdb/flink/tsfile/util/TSFileConfigUtil.java  |  62 ++++++
 .../apache/iotdb/flink/tool/TsFileWriteTool.java   | 234 +++++++++++++++++++++
 .../flink/tsfile/RowTsFileInputFormatITCase.java   | 109 ++++++++++
 .../flink/tsfile/RowTsFileInputFormatTest.java     |  89 ++++++++
 .../flink/tsfile/RowTsFileInputFormatTestBase.java | 119 +++++++++++
 pom.xml                                            |   2 +
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |   3 +-
 .../iotdb/tsfile/read/expression/IExpression.java  |   4 +-
 .../tsfile/read/expression/QueryExpression.java    |   3 +-
 19 files changed, 1308 insertions(+), 8 deletions(-)

diff --git a/example/flink/README.md b/example/flink/README.md
index fa3b867..56b92e4 100644
--- a/example/flink/README.md
+++ b/example/flink/README.md
@@ -28,3 +28,10 @@ The example is to show how to send data to a IoTDB server from a Flink job.
 ## Usage
 
 * Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
+
+# TsFile-Flink-Connector Example
+
+## Usage
+
+* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
+* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index efcd34d..f82d550 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -41,5 +41,15 @@
             <artifactId>iotdb-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>flink-tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
new file mode 100644
index 0000000..04df1df
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileBatchSource {
+
+	public static void main(String[] args) throws Exception {
+		String path = "test.tsfile";
+		Utils.writeTsFile(path);
+		new File(path).deleteOnExit();
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		QueryExpression queryExpression = QueryExpression.create(paths, null);
+		RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		inputFormat.setFilePath(path);
+		DataSet<Row> source = env.createInput(inputFormat);
+		List<String> result = source.map(Row::toString).collect();
+		for (String s : result) {
+			System.out.println(s);
+		}
+	}
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
new file mode 100644
index 0000000..3750ea2
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileStreamSource {
+
+	public static void main(String[] args) throws IOException {
+		String path = "test.tsfile";
+		Utils.writeTsFile(path);
+		new File(path).deleteOnExit();
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		QueryExpression queryExpression = QueryExpression.create(paths, null);
+		RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+		StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+		inputFormat.setFilePath("source.tsfile");
+		DataStream<Row> source = senv.createInput(inputFormat);
+		DataStream<String> rowString = source.map(Row::toString);
+		Iterator<String> result = DataStreamUtils.collect(rowString);
+		while (result.hasNext()) {
+			System.out.println(result.next());
+		}
+	}
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
new file mode 100644
index 0000000..e30ce40
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+
+public class Utils {
+
+	public static void writeTsFile(String path) {
+		try {
+			File f = FSFactoryProducer.getFSFactory().getFile(path);
+			if (f.exists()) {
+				f.delete();
+			}
+			TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+			// add measurements into file schema
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
+
+			// construct TSRecord
+			for (int i = 0; i < 100; i++) {
+				TSRecord tsRecord = new TSRecord(i, "device_" + (i % 4));
+				DataPoint dPoint1 = new LongDataPoint("sensor_1", i);
+				DataPoint dPoint2 = new LongDataPoint("sensor_2", i);
+				DataPoint dPoint3 = new LongDataPoint("sensor_3", i);
+				tsRecord.addTuple(dPoint1);
+				tsRecord.addTuple(dPoint2);
+				tsRecord.addTuple(dPoint3);
+
+				// write TSRecord
+				tsFileWriter.write(tsRecord);
+			}
+
+			tsFileWriter.close();
+		} catch (Throwable e) {
+			e.printStackTrace();
+			System.out.println(e.getMessage());
+		}
+	}
+}
diff --git a/flink-tsfile/README.md b/flink-tsfile/README.md
new file mode 100644
index 0000000..1f3f28c
--- /dev/null
+++ b/flink-tsfile/README.md
@@ -0,0 +1,93 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# TsFile-Flink-Connector User Guide
+
+## 1. About TsFile-Flink-Connector
+
+TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type. 
+This enables users to read, write and query Tsfile by Flink via DataStream/DataSet API.
+
+With this connector, you can
+* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
+* load all files in a specific directory, from either the local file system or hdfs, into Flink
+
+## 2. Quick Start
+
+### TsFileInputFormat Example
+
+1. create TsFileInputFormat with default RowRowRecordParser.
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+	.map(Path::new)
+	.collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. Read data from the input format and print to stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+	System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+	System.out.println(s);
+}
+```
+
diff --git a/example/flink/pom.xml b/flink-tsfile/pom.xml
similarity index 72%
copy from example/flink/pom.xml
copy to flink-tsfile/pom.xml
index efcd34d..0a66fa2 100644
--- a/example/flink/pom.xml
+++ b/flink-tsfile/pom.xml
@@ -23,23 +23,33 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-examples</artifactId>
+        <artifactId>iotdb-parent</artifactId>
         <version>0.10.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>flink-example</artifactId>
-    <name>IoTDB-Flink Examples</name>
+    <artifactId>flink-tsfile</artifactId>
     <packaging>jar</packaging>
+    <name>IoTDB Flink-TsFile</name>
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>flink-iotdb-connector</artifactId>
+            <artifactId>tsfile</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
+            <artifactId>hadoop-tsfile</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
new file mode 100644
index 0000000..72741a9
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.io.Serializable;
+
+/**
+ * RowRecordParser parses the RowRecord objects read from TsFile into the user desired format.
+ * If the accurate type information of parse result can not be extracted from the result type class automatically
+ * (e.g. Row, Tuple, etc.), the {@link ResultTypeQueryable} interface needs to be implemented to provide the type
+ * information explicitly.
+ *
+ * @param <T> The type of the parse result.
+ */
+public interface RowRecordParser<T> extends Serializable {
+
+	/**
+	 * Parse the row record into type T. The param `reuse` is recommended to use for reducing the creation of new
+	 * objects.
+	 *
+	 * @param rowRecord The input row record.
+	 * @param reuse The object could be reused.
+	 * @return The parsed result.
+	 */
+	T parse(RowRecord rowRecord, T reuse);
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
new file mode 100644
index 0000000..0ea9bce
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The parser that parses the RowRecord objects read from TsFile into Flink Row object.
+ */
+public class RowRowRecordParser implements RowRecordParser<Row>, ResultTypeQueryable<Row> {
+
+	private final int[] indexMapping;
+	private final RowTypeInfo rowTypeInfo;
+
+	public RowRowRecordParser(int[] indexMapping, RowTypeInfo rowTypeInfo) {
+		this.indexMapping = indexMapping;
+		this.rowTypeInfo = rowTypeInfo;
+	}
+
+	@Override
+	public Row parse(RowRecord rowRecord, Row reuse) {
+		List<Field> fields = rowRecord.getFields();
+		for (int i = 0; i < indexMapping.length; i++) {
+			if (indexMapping[i] < 0) {
+				// The negative index is treated as the marker of timestamp.
+				reuse.setField(i, rowRecord.getTimestamp());
+			} else {
+				reuse.setField(i, toSqlValue(fields.get(indexMapping[i])));
+			}
+		}
+		return reuse;
+	}
+
+	private Object toSqlValue(Field field) {
+		if (field == null) {
+			return null;
+		} else if (field.getDataType() == null) {
+			return null;
+		} else {
+			switch (field.getDataType()) {
+				case BOOLEAN:
+					return field.getBoolV();
+				case INT32:
+					return field.getIntV();
+				case INT64:
+					return field.getLongV();
+				case FLOAT:
+					return field.getFloatV();
+				case DOUBLE:
+					return field.getDoubleV();
+				case TEXT:
+					return field.getStringValue();
+				default:
+					throw new UnsupportedOperationException(
+						String.format("Unsupported type %s", field.getDataType()));
+			}
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return rowTypeInfo;
+	}
+
+	/**
+	 * Creates RowRowRecordParser from output RowTypeInfo and selected series in the RowRecord. The row field "time"
+	 * will be used to store the timestamp value. The other row fields store the values ​​of the same field names of
+	 * the RowRecord.
+	 *
+	 * @param outputRowTypeInfo The RowTypeInfo of the output row.
+	 * @param selectedSeries The selected series in the RowRecord.
+	 * @return The RowRowRecordParser.
+	 */
+	public static RowRowRecordParser create(RowTypeInfo outputRowTypeInfo, List<Path> selectedSeries) {
+		List<String> selectedSeriesNames = selectedSeries.stream().map(Path::toString).collect(Collectors.toList());
+		String[] rowFieldNames = outputRowTypeInfo.getFieldNames();
+		int[] indexMapping = new int[outputRowTypeInfo.getArity()];
+		for (int i = 0; i < outputRowTypeInfo.getArity(); i++) {
+			if (!QueryConstant.RESERVED_TIME.equals(rowFieldNames[i])) {
+				int index = selectedSeriesNames.indexOf(rowFieldNames[i]);
+				if (index >= 0) {
+					indexMapping[i] = index;
+				} else {
+					throw new IllegalArgumentException(rowFieldNames[i] + " is not found in selected series.");
+				}
+			} else {
+				// marked as timestamp field.
+				indexMapping[i] = -1;
+			}
+		}
+		return new RowRowRecordParser(indexMapping, outputRowTypeInfo);
+	}
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
new file mode 100644
index 0000000..d58d38d
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil;
+import org.apache.iotdb.hadoop.fileSystem.HDFSInput;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Input format that reads TsFiles. Users need to provide a {@link RowRecordParser} used to parse the raw data read
+ * from TsFiles into the type T.
+ *
+ * @param <T> The output type of this input format.
+ */
+public class TsFileInputFormat<T> extends FileInputFormat<T> implements ResultTypeQueryable<T> {
+
+	private final QueryExpression expression;
+	private final RowRecordParser<T> parser;
+	@Nullable
+	private final TSFileConfig config;
+
+	private transient org.apache.hadoop.conf.Configuration hadoopConf = null;
+	private transient ReadOnlyTsFile readTsFile = null;
+	private transient QueryDataSet queryDataSet = null;
+
+	public TsFileInputFormat(
+			@Nullable String path,
+			QueryExpression expression,
+			RowRecordParser<T> parser,
+			@Nullable TSFileConfig config) {
+		super(path != null ? new Path(path) : null);
+		this.expression = expression;
+		this.parser = parser;
+		this.config = config;
+	}
+
+	public TsFileInputFormat(@Nullable String path, QueryExpression expression, RowRecordParser<T> parser) {
+		this(path, expression, parser, null);
+	}
+
+	public TsFileInputFormat(QueryExpression expression, RowRecordParser<T> parser) {
+		this(null, expression, parser, null);
+	}
+
+	@Override
+	public void configure(Configuration flinkConfiguration) {
+		super.configure(flinkConfiguration);
+		hadoopConf = HadoopUtils.getHadoopConfiguration(flinkConfiguration);
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		if (config != null) {
+			TSFileConfigUtil.setGlobalTSFileConfig(config);
+		}
+		TsFileInput in;
+		try {
+			if (currentSplit.getPath().getFileSystem().isDistributedFS()) {
+				// HDFS
+				in = new HDFSInput(new org.apache.hadoop.fs.Path(new URI(currentSplit.getPath().getPath())),
+					hadoopConf);
+			} else {
+				// Local File System
+				in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().getPath()));
+			}
+		} catch (URISyntaxException e) {
+			throw new FlinkRuntimeException(e);
+		}
+		TsFileSequenceReader reader = new TsFileSequenceReader(in);
+		readTsFile = new ReadOnlyTsFile(reader);
+		queryDataSet = readTsFile.query(
+			// The query method call will change the content of the param query expression,
+			// the original query expression should not be passed to the query method as it may
+			// be used several times.
+			QueryExpression.create(expression.getSelectedSeries(), expression.getExpression()),
+			currentSplit.getStart(),
+			currentSplit.getStart() + currentSplit.getLength());
+	}
+
+	@Override
+	public void close() throws IOException {
+		super.close();
+		if (readTsFile != null) {
+			readTsFile.close();
+			readTsFile = null;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !queryDataSet.hasNext();
+	}
+
+	@Override
+	public T nextRecord(T t) throws IOException {
+		RowRecord rowRecord = queryDataSet.next();
+		return parser.parse(rowRecord, t);
+	}
+
+	@Override
+	public boolean supportsMultiPaths() {
+		return true;
+	}
+
+	public QueryExpression getExpression() {
+		return expression;
+	}
+
+	public RowRecordParser<T> getParser() {
+		return parser;
+	}
+
+	public Optional<TSFileConfig> getConfig() {
+		return Optional.ofNullable(config);
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		if (this.getParser() instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable) this.getParser()).getProducedType();
+		} else {
+			return TypeExtractor.createTypeInfo(
+				RowRecordParser.class,
+				this.getParser().getClass(), 0, null, null);
+		}
+	}
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
new file mode 100644
index 0000000..5295cf4
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile.util;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+public class TSFileConfigUtil {
+	
+	public static void setGlobalTSFileConfig(TSFileConfig config) {
+		TSFileConfig globalConfig = TSFileDescriptor.getInstance().getConfig();
+
+		globalConfig.setBatchSize(config.getBatchSize());
+		globalConfig.setBloomFilterErrorRate(config.getBloomFilterErrorRate());
+		globalConfig.setCompressor(config.getCompressor().toString());
+		globalConfig.setCoreSitePath(config.getCoreSitePath());
+		globalConfig.setDeltaBlockSize(config.getDeltaBlockSize());
+		globalConfig.setDfsClientFailoverProxyProvider(config.getDfsClientFailoverProxyProvider());
+		globalConfig.setDfsHaAutomaticFailoverEnabled(config.isDfsHaAutomaticFailoverEnabled());
+		globalConfig.setDfsHaNamenodes(config.getDfsHaNamenodes());
+		globalConfig.setDfsNameServices(config.getDfsNameServices());
+		globalConfig.setDftSatisfyRate(config.getDftSatisfyRate());
+		globalConfig.setEndian(config.getEndian());
+		globalConfig.setFloatPrecision(config.getFloatPrecision());
+		globalConfig.setFreqType(config.getFreqType());
+		globalConfig.setGroupSizeInByte(config.getGroupSizeInByte());
+		globalConfig.setHdfsIp(config.getHdfsIp());
+		globalConfig.setHdfsPort(config.getHdfsPort());
+		globalConfig.setHdfsSitePath(config.getHdfsSitePath());
+		globalConfig.setKerberosKeytabFilePath(config.getKerberosKeytabFilePath());
+		globalConfig.setKerberosPrincipal(config.getKerberosPrincipal());
+		globalConfig.setMaxNumberOfPointsInPage(config.getMaxNumberOfPointsInPage());
+		globalConfig.setMaxStringLength(config.getMaxStringLength());
+		globalConfig.setPageCheckSizeThreshold(config.getPageCheckSizeThreshold());
+		globalConfig.setPageSizeInByte(config.getPageSizeInByte());
+		globalConfig.setPlaMaxError(config.getPlaMaxError());
+		globalConfig.setRleBitWidth(config.getRleBitWidth());
+		globalConfig.setSdtMaxError(config.getSdtMaxError());
+		globalConfig.setTimeEncoder(config.getTimeEncoder());
+		globalConfig.setTimeSeriesDataType(config.getTimeSeriesDataType());
+		globalConfig.setTSFileStorageFs(config.getTSFileStorageFs().toString());
+		globalConfig.setUseKerberos(config.isUseKerberos());
+		globalConfig.setValueEncoder(config.getValueEncoder());
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
new file mode 100644
index 0000000..bc65ee2
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tool;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+/**
+ * An example of writing data to TsFile
+ */
+public class TsFileWriteTool {
+
+	public static final String TMP_DIR = "target";
+
+	public static void create1(String tsfilePath) throws Exception {
+		File f = new File(tsfilePath);
+		if (f.exists()) {
+			f.delete();
+		}
+		TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+		// add measurements into file schema
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+		// construct TSRecord
+		TSRecord tsRecord = new TSRecord(1, "device_1");
+		DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+		DataPoint dPoint3;
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+
+		// write a TSRecord to TsFile
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(2, "device_1");
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 50);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(3, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(4, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 51);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(6, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 11);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(7, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(8, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 30);
+		dPoint3 = new IntDataPoint("sensor_3", 31);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(1, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+		dPoint2 = new IntDataPoint("sensor_2", 11);
+		dPoint3 = new IntDataPoint("sensor_3", 19);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(2, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		// close TsFile
+		tsFileWriter.close();
+	}
+
+	public static void create2(String tsfilePath) throws Exception {
+		File f = new File(tsfilePath);
+		if (f.exists()) {
+			f.delete();
+		}
+		TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+		// add measurements into file schema
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+		// construct TSRecord
+		TSRecord tsRecord = new TSRecord(9, "device_1");
+		DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+		DataPoint dPoint3;
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+
+		// write a TSRecord to TsFile
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(10, "device_1");
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 50);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(11, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(12, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 51);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(14, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 11);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(15, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(16, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 30);
+		dPoint3 = new IntDataPoint("sensor_3", 31);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(9, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+		dPoint2 = new IntDataPoint("sensor_2", 11);
+		dPoint3 = new IntDataPoint("sensor_3", 19);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(10, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		// close TsFile
+		tsFileWriter.close();
+	}
+}
\ No newline at end of file
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
new file mode 100644
index 0000000..5198152
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * ITCases for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatITCase extends RowTsFileInputFormatTestBase {
+
+	private ExecutionEnvironment env;
+	private StreamExecutionEnvironment senv;
+
+	@Before
+	public void prepareEnv() {
+		env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		senv = StreamExecutionEnvironment.getExecutionEnvironment();
+		senv.setParallelism(1);
+	}
+
+	@Test
+	public void testBatchExecution() throws Exception {
+		// read multiple files
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(null);
+		inputFormat.setFilePaths(sourceTsFilePath1, sourceTsFilePath2);
+		DataSet<Row> source = env.createInput(inputFormat);
+		List<String> result = source.map(Row::toString).collect();
+		Collections.sort(result);
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"10,null,20,50,25.4,10,21",
+			"11,1.4,21,null,null,null,null",
+			"12,1.2,20,51,null,null,null",
+			"14,7.2,10,11,null,null,null",
+			"15,6.2,20,21,null,null,null",
+			"16,9.2,30,31,null,null,null",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null",
+			"9,1.2,20,null,2.3,11,19"
+		};
+		assertArrayEquals(expected, result.toArray());
+	}
+
+	@Test
+	public void testStreamExecution() throws Exception {
+		// read files in a directory
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(tmpDir);
+		DataStream<Row> source = senv.createInput(inputFormat);
+		Iterator<String> rowStringIterator = DataStreamUtils.collect(source.map(Row::toString));
+		String[] result = StreamSupport.stream(
+			Spliterators.spliteratorUnknownSize(rowStringIterator, 0),
+			false).sorted().toArray(String[]::new);
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"10,null,20,50,25.4,10,21",
+			"11,1.4,21,null,null,null,null",
+			"12,1.2,20,51,null,null,null",
+			"14,7.2,10,11,null,null,null",
+			"15,6.2,20,21,null,null,null",
+			"16,9.2,30,31,null,null,null",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null",
+			"9,1.2,20,null,2.3,11,19"
+		};
+		assertArrayEquals(expected, result);
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
new file mode 100644
index 0000000..62f4a9b
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatTest extends RowTsFileInputFormatTestBase {
+
+	@Test
+	public void testReadData() throws IOException {
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+		List<String> actual = new ArrayList<>();
+
+		try {
+			inputFormat.configure(new Configuration());
+			inputFormat.openInputFormat();
+			FileInputSplit[] inputSplits = inputFormat.createInputSplits(2);
+			Row reuse = rowTypeInfo.createSerializer(new ExecutionConfig()).createInstance();
+			for (FileInputSplit inputSplit : inputSplits) {
+				try {
+					inputFormat.open(inputSplit);
+					assertEquals(config.getBatchSize(), TSFileDescriptor.getInstance().getConfig().getBatchSize());
+					while (!inputFormat.reachedEnd()) {
+						Row row = inputFormat.nextRecord(reuse);
+						actual.add(row.toString());
+					}
+				} finally {
+					inputFormat.close();
+				}
+			}
+		} finally {
+			inputFormat.closeInputFormat();
+		}
+
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null"
+		};
+		assertArrayEquals(actual.toArray(), expected);
+	}
+
+	@Test
+	public void testGetter() {
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+		assertEquals(parser, inputFormat.getParser());
+		assertEquals(queryExpression, inputFormat.getExpression());
+		assertEquals(config, inputFormat.getConfig().get());
+		assertEquals(parser.getProducedType(), inputFormat.getProducedType());
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
new file mode 100644
index 0000000..0e5f14c
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tool.TsFileWriteTool;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Base class of the TsFileInputFormat tests.
+ */
+public abstract class RowTsFileInputFormatTestBase {
+
+	protected String tmpDir;
+	protected String sourceTsFilePath1;
+	protected String sourceTsFilePath2;
+	protected RowTypeInfo rowTypeInfo;
+	protected TSFileConfig config;
+	protected RowRowRecordParser parser;
+	protected QueryExpression queryExpression;
+
+	@Before
+	public void prepareSourceTsFile() throws Exception {
+		tmpDir = String.join(
+			File.separator,
+			TsFileWriteTool.TMP_DIR,
+			UUID.randomUUID().toString());
+		new File(tmpDir).mkdirs();
+		sourceTsFilePath1 = String.join(
+			File.separator,
+			tmpDir, "source1.tsfile");
+		sourceTsFilePath2 = String.join(
+			File.separator,
+			tmpDir, "source2.tsfile");
+		TsFileWriteTool.create1(sourceTsFilePath1);
+		TsFileWriteTool.create2(sourceTsFilePath2);
+	}
+
+	@After
+	public void removeSourceTsFile() {
+		File sourceTsFile1 = new File(sourceTsFilePath1);
+		if (sourceTsFile1.exists()) {
+			sourceTsFile1.delete();
+		}
+		File sourceTsFile2 = new File(sourceTsFilePath2);
+		if (sourceTsFile2.exists()) {
+			sourceTsFile2.delete();
+		}
+		File tmpDirFile = new File(tmpDir);
+		if (tmpDirFile.exists()) {
+			tmpDirFile.delete();
+		}
+	}
+
+	protected TsFileInputFormat<Row> prepareInputFormat(String filePath) {
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.FLOAT,
+			Types.INT,
+			Types.INT,
+			Types.FLOAT,
+			Types.INT,
+			Types.INT
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		config = new TSFileConfig();
+		config.setBatchSize(500);
+		rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		queryExpression = QueryExpression.create(paths, null);
+		parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		return new TsFileInputFormat<>(filePath, queryExpression, parser, config);
+	}
+}
diff --git a/pom.xml b/pom.xml
index 1c4c80a..7b5b081 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <module>example</module>
         <module>grafana</module>
         <module>spark-tsfile</module>
+        <module>flink-tsfile</module>
         <module>hadoop</module>
         <module>spark-iotdb-connector</module>
         <module>flink-iotdb-connector</module>
@@ -108,6 +109,7 @@
         <logback.version>1.1.11</logback.version>
         <joda.version>2.9.9</joda.version>
         <spark.version>2.4.3</spark.version>
+        <flink.version>1.10.0</flink.version>
         <common.io.version>2.5</common.io.version>
         <commons.collections4>4.0</commons.collections4>
         <thrift.version>0.13.0</thrift.version>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 9a97c65..c5c8627 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.common.conf;
 
+import java.io.Serializable;
 import java.nio.charset.Charset;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -27,7 +28,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSType;
  * TSFileConfig is a configure class. Every variables is public and has default
  * value.
  */
-public class TSFileConfig {
+public class TSFileConfig implements Serializable {
 
   // Memory configuration
   public static final int RLE_MIN_REPEATED_NUM = 8;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index 4f7feae..93bd706 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.tsfile.read.expression;
 
-public interface IExpression {
+import java.io.Serializable;
+
+public interface IExpression extends Serializable {
 
   ExpressionType getType();
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
index 9d068fc..996d918 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iotdb.tsfile.read.expression;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class QueryExpression {
+public class QueryExpression implements Serializable {
 
   private List<Path> selectedSeries;
   private List<TSDataType> dataTypes;