You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 12:25:42 UTC

[incubator-iotdb] branch master updated (48b0360 -> b24cdde)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 48b0360  not delete incompatible file in file level api (#1007)
     new 0f782ad  add TsFileInputFormat for reading TsFiles in flink.
     new 2feb0ce  try to fix windows test
     new bee90f7  add docs and rename util classes.
     new 4652631  address comments and push to test travis changes.
     new 4846b4a  fix compile error caused by rebase
     new b24cdde  fix compile error caused by rebase and add a new test to help maintain the TSFileConfigUtil.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml                                        |   2 +-
 example/flink/README.md                            |   7 +
 example/flink/pom.xml                              |  10 +
 .../apache/iotdb/flink/FlinkTsFileBatchSource.java |  80 +++++++
 .../iotdb/flink/FlinkTsFileStreamSource.java       |  84 ++++++++
 .../java/org/apache/iotdb/flink/TsFlieUtils.java   |  72 +++++++
 flink-tsfile-connector/README.md                   |  93 +++++++++
 {example/flink => flink-tsfile-connector}/pom.xml  |  19 +-
 .../apache/iotdb/flink/tsfile/RowRecordParser.java |  46 +++++
 .../iotdb/flink/tsfile/RowRowRecordParser.java     | 120 +++++++++++
 .../iotdb/flink/tsfile/TsFileInputFormat.java      | 168 +++++++++++++++
 .../iotdb/flink/tsfile/util/TSFileConfigUtil.java  |  65 ++++++
 .../flink/tsfile/RowTsFileInputFormatITCase.java   | 109 ++++++++++
 .../flink/tsfile/RowTsFileInputFormatTest.java     |  89 ++++++++
 .../flink/tsfile/RowTsFileInputFormatTestBase.java | 118 +++++++++++
 .../util/TSFileConfigUtilCompletenessTest.java     |  84 ++++++++
 .../apache/iotdb/flink/util/TsFileWriteUtil.java   | 230 +++++++++++++++++++++
 pom.xml                                            |   2 +
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |   3 +-
 .../iotdb/tsfile/read/expression/IExpression.java  |   4 +-
 .../tsfile/read/expression/QueryExpression.java    |   3 +-
 21 files changed, 1399 insertions(+), 9 deletions(-)
 create mode 100644 example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
 create mode 100644 example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
 create mode 100644 example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
 create mode 100644 flink-tsfile-connector/README.md
 copy {example/flink => flink-tsfile-connector}/pom.xml (73%)
 create mode 100644 flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
 create mode 100644 flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
 create mode 100644 flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
 create mode 100644 flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
 create mode 100644 flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
 create mode 100644 flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
 create mode 100644 flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
 create mode 100644 flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
 create mode 100644 flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java


[incubator-iotdb] 03/06: add docs and rename util classes.

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit bee90f7b14fc11aea989b44b6bf57fcf85285069
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Sun Mar 29 15:38:27 2020 +0800

    add docs and rename util classes.
---
 .../main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java | 5 ++++-
 .../java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java     | 5 ++++-
 .../java/org/apache/iotdb/flink/{Utils.java => TsFlieUtils.java} | 5 ++++-
 .../org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java     | 3 +++
 .../apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java  | 9 ++++-----
 .../{tool/TsFileWriteTool.java => util/TsFileWriteUtil.java}     | 6 +++---
 6 files changed, 22 insertions(+), 11 deletions(-)

diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
index 04df1df..6f01b75 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
@@ -34,11 +34,14 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The example of reading TsFile via Flink DataSet API.
+ */
 public class FlinkTsFileBatchSource {
 
 	public static void main(String[] args) throws Exception {
 		String path = "test.tsfile";
-		Utils.writeTsFile(path);
+		TsFlieUtils.writeTsFile(path);
 		new File(path).deleteOnExit();
 		String[] filedNames = {
 			QueryConstant.RESERVED_TIME,
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
index 3750ea2..83f7de9 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
@@ -37,11 +37,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The example of reading TsFile via Flink DataStream API.
+ */
 public class FlinkTsFileStreamSource {
 
 	public static void main(String[] args) throws IOException {
 		String path = "test.tsfile";
-		Utils.writeTsFile(path);
+		TsFlieUtils.writeTsFile(path);
 		new File(path).deleteOnExit();
 		String[] filedNames = {
 			QueryConstant.RESERVED_TIME,
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
similarity index 96%
rename from example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
rename to example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
index e30ce40..ed7b722 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
@@ -28,7 +28,10 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.File;
 
-public class Utils {
+/**
+ * Utils used to prepare source TsFiles for the examples.
+ */
+public class TsFlieUtils {
 
 	public static void writeTsFile(String path) {
 		try {
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
index 5295cf4..43b6237 100644
--- a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.flink.tsfile.util;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 
+/**
+ * Utils of setting global TSFileConfig.
+ */
 public class TSFileConfigUtil {
 	
 	public static void setGlobalTSFileConfig(TSFileConfig config) {
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
index 0e5f14c..9da84f7 100644
--- a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
@@ -22,10 +22,9 @@ package org.apache.iotdb.flink.tsfile;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
 
 import org.apache.flink.types.Row;
-import org.apache.iotdb.flink.tool.TsFileWriteTool;
+import org.apache.iotdb.flink.util.TsFileWriteUtil;
 
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.constant.QueryConstant;
@@ -57,7 +56,7 @@ public abstract class RowTsFileInputFormatTestBase {
 	public void prepareSourceTsFile() throws Exception {
 		tmpDir = String.join(
 			File.separator,
-			TsFileWriteTool.TMP_DIR,
+			TsFileWriteUtil.TMP_DIR,
 			UUID.randomUUID().toString());
 		new File(tmpDir).mkdirs();
 		sourceTsFilePath1 = String.join(
@@ -66,8 +65,8 @@ public abstract class RowTsFileInputFormatTestBase {
 		sourceTsFilePath2 = String.join(
 			File.separator,
 			tmpDir, "source2.tsfile");
-		TsFileWriteTool.create1(sourceTsFilePath1);
-		TsFileWriteTool.create2(sourceTsFilePath2);
+		TsFileWriteUtil.create1(sourceTsFilePath1);
+		TsFileWriteUtil.create2(sourceTsFilePath2);
 	}
 
 	@After
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
similarity index 98%
rename from flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
rename to flink-tsfile/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
index bc65ee2..8dd413c 100644
--- a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.flink.tool;
+package org.apache.iotdb.flink.util;
 
 import java.io.File;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -30,9 +30,9 @@ import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 /**
- * An example of writing data to TsFile
+ * Utils used to prepare TsFiles for testing.
  */
-public class TsFileWriteTool {
+public class TsFileWriteUtil {
 
 	public static final String TMP_DIR = "target";
 


[incubator-iotdb] 04/06: address comments and push to test travis changes.

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4652631edcfe26479e750ce99a5663f547912a4d
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Wed Apr 1 17:08:47 2020 +0800

    address comments and push to test travis changes.
---
 .travis.yml                                                            | 2 +-
 example/flink/pom.xml                                                  | 2 +-
 {flink-tsfile => flink-tsfile-connector}/README.md                     | 2 +-
 {flink-tsfile => flink-tsfile-connector}/pom.xml                       | 3 +--
 .../src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java   | 0
 .../main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java    | 0
 .../src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java | 0
 .../main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java | 0
 .../java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java | 0
 .../java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java   | 0
 .../org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java    | 0
 .../src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java     | 0
 pom.xml                                                                | 2 +-
 13 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3924e33..6b91e6d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -137,7 +137,7 @@ matrix:
         # now, grafana has no tests; spark-* tests are written by scala
         - mvn post-integration-test -Pcode-coverage -Pcoveralls -pl '!site','!distribution' -am
       after_success:
-        - mvn coveralls:report -DserviceName=travis_ci -pl tsfile,server,jdbc,client,session,hive-connector,flink-iotdb-connector
+        - mvn coveralls:report -DserviceName=travis_ci -pl tsfile,server,jdbc,client,session,hive-connector,flink-iotdb-connector,flink-tsfile-connector
     - os: linux
       name: website checker
       dist: xenial
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index f82d550..e46e834 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -48,7 +48,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>flink-tsfile</artifactId>
+            <artifactId>flink-tsfile-connector</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
diff --git a/flink-tsfile/README.md b/flink-tsfile-connector/README.md
similarity index 97%
rename from flink-tsfile/README.md
rename to flink-tsfile-connector/README.md
index 1f3f28c..a571de7 100644
--- a/flink-tsfile/README.md
+++ b/flink-tsfile-connector/README.md
@@ -23,7 +23,7 @@
 ## 1. About TsFile-Flink-Connector
 
 TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type. 
-This enables users to read, write and query Tsfile by Flink via DataStream/DataSet API.
+This enables users to read Tsfile by Flink via DataStream/DataSet API.
 
 With this connector, you can
 * load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
diff --git a/flink-tsfile/pom.xml b/flink-tsfile-connector/pom.xml
similarity index 96%
rename from flink-tsfile/pom.xml
rename to flink-tsfile-connector/pom.xml
index 0a66fa2..3879873 100644
--- a/flink-tsfile/pom.xml
+++ b/flink-tsfile-connector/pom.xml
@@ -27,9 +27,8 @@
         <version>0.10.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>flink-tsfile</artifactId>
+    <artifactId>flink-tsfile-connector</artifactId>
     <packaging>jar</packaging>
-    <name>IoTDB Flink-TsFile</name>
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
similarity index 100%
rename from flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
rename to flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
similarity index 100%
rename from flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
rename to flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
similarity index 100%
rename from flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
rename to flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
similarity index 100%
rename from flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
rename to flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
similarity index 100%
rename from flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
rename to flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
similarity index 100%
rename from flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
rename to flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
similarity index 100%
rename from flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
rename to flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
similarity index 100%
rename from flink-tsfile/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
rename to flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
diff --git a/pom.xml b/pom.xml
index 7b5b081..647466f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,9 +88,9 @@
         <module>example</module>
         <module>grafana</module>
         <module>spark-tsfile</module>
-        <module>flink-tsfile</module>
         <module>hadoop</module>
         <module>spark-iotdb-connector</module>
+        <module>flink-tsfile-connector</module>
         <module>flink-iotdb-connector</module>
         <module>distribution</module>
         <module>hive-connector</module>


[incubator-iotdb] 01/06: add TsFileInputFormat for reading TsFiles in flink.

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0f782ad90b14ee0481d13e36cf144ead404c615c
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Thu Mar 26 17:35:54 2020 +0800

    add TsFileInputFormat for reading TsFiles in flink.
---
 example/flink/README.md                            |   7 +
 example/flink/pom.xml                              |  10 +
 .../apache/iotdb/flink/FlinkTsFileBatchSource.java |  77 +++++++
 .../iotdb/flink/FlinkTsFileStreamSource.java       |  81 +++++++
 .../main/java/org/apache/iotdb/flink/Utils.java    |  69 ++++++
 flink-tsfile/README.md                             |  93 ++++++++
 {example/flink => flink-tsfile}/pom.xml            |  20 +-
 .../apache/iotdb/flink/tsfile/RowRecordParser.java |  46 ++++
 .../iotdb/flink/tsfile/RowRowRecordParser.java     | 120 +++++++++++
 .../iotdb/flink/tsfile/TsFileInputFormat.java      | 168 +++++++++++++++
 .../iotdb/flink/tsfile/util/TSFileConfigUtil.java  |  62 ++++++
 .../apache/iotdb/flink/tool/TsFileWriteTool.java   | 234 +++++++++++++++++++++
 .../flink/tsfile/RowTsFileInputFormatITCase.java   | 109 ++++++++++
 .../flink/tsfile/RowTsFileInputFormatTest.java     |  89 ++++++++
 .../flink/tsfile/RowTsFileInputFormatTestBase.java | 119 +++++++++++
 pom.xml                                            |   2 +
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |   3 +-
 .../iotdb/tsfile/read/expression/IExpression.java  |   4 +-
 .../tsfile/read/expression/QueryExpression.java    |   3 +-
 19 files changed, 1308 insertions(+), 8 deletions(-)

diff --git a/example/flink/README.md b/example/flink/README.md
index fa3b867..56b92e4 100644
--- a/example/flink/README.md
+++ b/example/flink/README.md
@@ -28,3 +28,10 @@ The example is to show how to send data to a IoTDB server from a Flink job.
 ## Usage
 
 * Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
+
+# TsFile-Flink-Connector Example
+
+## Usage
+
+* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
+* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index efcd34d..f82d550 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -41,5 +41,15 @@
             <artifactId>iotdb-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>flink-tsfile</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
new file mode 100644
index 0000000..04df1df
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileBatchSource {
+
+	public static void main(String[] args) throws Exception {
+		String path = "test.tsfile";
+		Utils.writeTsFile(path);
+		new File(path).deleteOnExit();
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		QueryExpression queryExpression = QueryExpression.create(paths, null);
+		RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		inputFormat.setFilePath(path);
+		DataSet<Row> source = env.createInput(inputFormat);
+		List<String> result = source.map(Row::toString).collect();
+		for (String s : result) {
+			System.out.println(s);
+		}
+	}
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
new file mode 100644
index 0000000..3750ea2
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileStreamSource {
+
+	public static void main(String[] args) throws IOException {
+		String path = "test.tsfile";
+		Utils.writeTsFile(path);
+		new File(path).deleteOnExit();
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG,
+			Types.LONG
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		QueryExpression queryExpression = QueryExpression.create(paths, null);
+		RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+		StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+		inputFormat.setFilePath("source.tsfile");
+		DataStream<Row> source = senv.createInput(inputFormat);
+		DataStream<String> rowString = source.map(Row::toString);
+		Iterator<String> result = DataStreamUtils.collect(rowString);
+		while (result.hasNext()) {
+			System.out.println(result.next());
+		}
+	}
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
new file mode 100644
index 0000000..e30ce40
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+
+public class Utils {
+
+	public static void writeTsFile(String path) {
+		try {
+			File f = FSFactoryProducer.getFSFactory().getFile(path);
+			if (f.exists()) {
+				f.delete();
+			}
+			TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+			// add measurements into file schema
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
+			tsFileWriter
+				.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
+
+			// construct TSRecord
+			for (int i = 0; i < 100; i++) {
+				TSRecord tsRecord = new TSRecord(i, "device_" + (i % 4));
+				DataPoint dPoint1 = new LongDataPoint("sensor_1", i);
+				DataPoint dPoint2 = new LongDataPoint("sensor_2", i);
+				DataPoint dPoint3 = new LongDataPoint("sensor_3", i);
+				tsRecord.addTuple(dPoint1);
+				tsRecord.addTuple(dPoint2);
+				tsRecord.addTuple(dPoint3);
+
+				// write TSRecord
+				tsFileWriter.write(tsRecord);
+			}
+
+			tsFileWriter.close();
+		} catch (Throwable e) {
+			e.printStackTrace();
+			System.out.println(e.getMessage());
+		}
+	}
+}
diff --git a/flink-tsfile/README.md b/flink-tsfile/README.md
new file mode 100644
index 0000000..1f3f28c
--- /dev/null
+++ b/flink-tsfile/README.md
@@ -0,0 +1,93 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# TsFile-Flink-Connector User Guide
+
+## 1. About TsFile-Flink-Connector
+
+TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type. 
+This enables users to read, write and query Tsfile by Flink via DataStream/DataSet API.
+
+With this connector, you can
+* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
+* load all files in a specific directory, from either the local file system or hdfs, into Flink
+
+## 2. Quick Start
+
+### TsFileInputFormat Example
+
+1. create TsFileInputFormat with default RowRowRecordParser.
+
+```java
+String[] filedNames = {
+	QueryConstant.RESERVED_TIME,
+	"device_1.sensor_1",
+	"device_1.sensor_2",
+	"device_1.sensor_3",
+	"device_2.sensor_1",
+	"device_2.sensor_2",
+	"device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+	Types.LONG,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT,
+	Types.FLOAT,
+	Types.INT,
+	Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+	.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+	.map(Path::new)
+	.collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. Read data from the input format and print to stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+	System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+	System.out.println(s);
+}
+```
+
diff --git a/example/flink/pom.xml b/flink-tsfile/pom.xml
similarity index 72%
copy from example/flink/pom.xml
copy to flink-tsfile/pom.xml
index efcd34d..0a66fa2 100644
--- a/example/flink/pom.xml
+++ b/flink-tsfile/pom.xml
@@ -23,23 +23,33 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-examples</artifactId>
+        <artifactId>iotdb-parent</artifactId>
         <version>0.10.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>flink-example</artifactId>
-    <name>IoTDB-Flink Examples</name>
+    <artifactId>flink-tsfile</artifactId>
     <packaging>jar</packaging>
+    <name>IoTDB Flink-TsFile</name>
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>flink-iotdb-connector</artifactId>
+            <artifactId>tsfile</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
+            <artifactId>hadoop-tsfile</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
new file mode 100644
index 0000000..72741a9
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.io.Serializable;
+
+/**
+ * RowRecordParser parses the RowRecord objects read from TsFile into the user desired format.
+ * If the accurate type information of parse result can not be extracted from the result type class automatically
+ * (e.g. Row, Tuple, etc.), the {@link ResultTypeQueryable} interface needs to be implemented to provide the type
+ * information explicitly.
+ *
+ * @param <T> The type of the parse result.
+ */
+public interface RowRecordParser<T> extends Serializable {
+
+	/**
+	 * Parse the row record into type T. The param `reuse` is recommended to use for reducing the creation of new
+	 * objects.
+	 *
+	 * @param rowRecord The input row record.
+	 * @param reuse The object could be reused.
+	 * @return The parsed result.
+	 */
+	T parse(RowRecord rowRecord, T reuse);
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
new file mode 100644
index 0000000..0ea9bce
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The parser that parses the RowRecord objects read from TsFile into Flink Row object.
+ */
+public class RowRowRecordParser implements RowRecordParser<Row>, ResultTypeQueryable<Row> {
+
+	private final int[] indexMapping;
+	private final RowTypeInfo rowTypeInfo;
+
+	public RowRowRecordParser(int[] indexMapping, RowTypeInfo rowTypeInfo) {
+		this.indexMapping = indexMapping;
+		this.rowTypeInfo = rowTypeInfo;
+	}
+
+	@Override
+	public Row parse(RowRecord rowRecord, Row reuse) {
+		List<Field> fields = rowRecord.getFields();
+		for (int i = 0; i < indexMapping.length; i++) {
+			if (indexMapping[i] < 0) {
+				// The negative index is treated as the marker of timestamp.
+				reuse.setField(i, rowRecord.getTimestamp());
+			} else {
+				reuse.setField(i, toSqlValue(fields.get(indexMapping[i])));
+			}
+		}
+		return reuse;
+	}
+
+	private Object toSqlValue(Field field) {
+		if (field == null) {
+			return null;
+		} else if (field.getDataType() == null) {
+			return null;
+		} else {
+			switch (field.getDataType()) {
+				case BOOLEAN:
+					return field.getBoolV();
+				case INT32:
+					return field.getIntV();
+				case INT64:
+					return field.getLongV();
+				case FLOAT:
+					return field.getFloatV();
+				case DOUBLE:
+					return field.getDoubleV();
+				case TEXT:
+					return field.getStringValue();
+				default:
+					throw new UnsupportedOperationException(
+						String.format("Unsupported type %s", field.getDataType()));
+			}
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return rowTypeInfo;
+	}
+
+	/**
+	 * Creates RowRowRecordParser from output RowTypeInfo and selected series in the RowRecord. The row field "time"
+	 * will be used to store the timestamp value. The other row fields store the values ​​of the same field names of
+	 * the RowRecord.
+	 *
+	 * @param outputRowTypeInfo The RowTypeInfo of the output row.
+	 * @param selectedSeries The selected series in the RowRecord.
+	 * @return The RowRowRecordParser.
+	 */
+	public static RowRowRecordParser create(RowTypeInfo outputRowTypeInfo, List<Path> selectedSeries) {
+		List<String> selectedSeriesNames = selectedSeries.stream().map(Path::toString).collect(Collectors.toList());
+		String[] rowFieldNames = outputRowTypeInfo.getFieldNames();
+		int[] indexMapping = new int[outputRowTypeInfo.getArity()];
+		for (int i = 0; i < outputRowTypeInfo.getArity(); i++) {
+			if (!QueryConstant.RESERVED_TIME.equals(rowFieldNames[i])) {
+				int index = selectedSeriesNames.indexOf(rowFieldNames[i]);
+				if (index >= 0) {
+					indexMapping[i] = index;
+				} else {
+					throw new IllegalArgumentException(rowFieldNames[i] + " is not found in selected series.");
+				}
+			} else {
+				// marked as timestamp field.
+				indexMapping[i] = -1;
+			}
+		}
+		return new RowRowRecordParser(indexMapping, outputRowTypeInfo);
+	}
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
new file mode 100644
index 0000000..d58d38d
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil;
+import org.apache.iotdb.hadoop.fileSystem.HDFSInput;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Input format that reads TsFiles. Users need to provide a {@link RowRecordParser} used to parse the raw data read
+ * from TsFiles into the type T.
+ *
+ * @param <T> The output type of this input format.
+ */
+public class TsFileInputFormat<T> extends FileInputFormat<T> implements ResultTypeQueryable<T> {
+
+	private final QueryExpression expression;
+	private final RowRecordParser<T> parser;
+	@Nullable
+	private final TSFileConfig config;
+
+	private transient org.apache.hadoop.conf.Configuration hadoopConf = null;
+	private transient ReadOnlyTsFile readTsFile = null;
+	private transient QueryDataSet queryDataSet = null;
+
+	public TsFileInputFormat(
+			@Nullable String path,
+			QueryExpression expression,
+			RowRecordParser<T> parser,
+			@Nullable TSFileConfig config) {
+		super(path != null ? new Path(path) : null);
+		this.expression = expression;
+		this.parser = parser;
+		this.config = config;
+	}
+
+	public TsFileInputFormat(@Nullable String path, QueryExpression expression, RowRecordParser<T> parser) {
+		this(path, expression, parser, null);
+	}
+
+	public TsFileInputFormat(QueryExpression expression, RowRecordParser<T> parser) {
+		this(null, expression, parser, null);
+	}
+
+	@Override
+	public void configure(Configuration flinkConfiguration) {
+		super.configure(flinkConfiguration);
+		hadoopConf = HadoopUtils.getHadoopConfiguration(flinkConfiguration);
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		if (config != null) {
+			TSFileConfigUtil.setGlobalTSFileConfig(config);
+		}
+		TsFileInput in;
+		try {
+			if (currentSplit.getPath().getFileSystem().isDistributedFS()) {
+				// HDFS
+				in = new HDFSInput(new org.apache.hadoop.fs.Path(new URI(currentSplit.getPath().getPath())),
+					hadoopConf);
+			} else {
+				// Local File System
+				in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().getPath()));
+			}
+		} catch (URISyntaxException e) {
+			throw new FlinkRuntimeException(e);
+		}
+		TsFileSequenceReader reader = new TsFileSequenceReader(in);
+		readTsFile = new ReadOnlyTsFile(reader);
+		queryDataSet = readTsFile.query(
+			// The query method call will change the content of the param query expression,
+			// the original query expression should not be passed to the query method as it may
+			// be used several times.
+			QueryExpression.create(expression.getSelectedSeries(), expression.getExpression()),
+			currentSplit.getStart(),
+			currentSplit.getStart() + currentSplit.getLength());
+	}
+
+	@Override
+	public void close() throws IOException {
+		super.close();
+		if (readTsFile != null) {
+			readTsFile.close();
+			readTsFile = null;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !queryDataSet.hasNext();
+	}
+
+	@Override
+	public T nextRecord(T t) throws IOException {
+		RowRecord rowRecord = queryDataSet.next();
+		return parser.parse(rowRecord, t);
+	}
+
+	@Override
+	public boolean supportsMultiPaths() {
+		return true;
+	}
+
+	public QueryExpression getExpression() {
+		return expression;
+	}
+
+	public RowRecordParser<T> getParser() {
+		return parser;
+	}
+
+	public Optional<TSFileConfig> getConfig() {
+		return Optional.ofNullable(config);
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		if (this.getParser() instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable) this.getParser()).getProducedType();
+		} else {
+			return TypeExtractor.createTypeInfo(
+				RowRecordParser.class,
+				this.getParser().getClass(), 0, null, null);
+		}
+	}
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
new file mode 100644
index 0000000..5295cf4
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile.util;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+public class TSFileConfigUtil {
+	
+	public static void setGlobalTSFileConfig(TSFileConfig config) {
+		TSFileConfig globalConfig = TSFileDescriptor.getInstance().getConfig();
+
+		globalConfig.setBatchSize(config.getBatchSize());
+		globalConfig.setBloomFilterErrorRate(config.getBloomFilterErrorRate());
+		globalConfig.setCompressor(config.getCompressor().toString());
+		globalConfig.setCoreSitePath(config.getCoreSitePath());
+		globalConfig.setDeltaBlockSize(config.getDeltaBlockSize());
+		globalConfig.setDfsClientFailoverProxyProvider(config.getDfsClientFailoverProxyProvider());
+		globalConfig.setDfsHaAutomaticFailoverEnabled(config.isDfsHaAutomaticFailoverEnabled());
+		globalConfig.setDfsHaNamenodes(config.getDfsHaNamenodes());
+		globalConfig.setDfsNameServices(config.getDfsNameServices());
+		globalConfig.setDftSatisfyRate(config.getDftSatisfyRate());
+		globalConfig.setEndian(config.getEndian());
+		globalConfig.setFloatPrecision(config.getFloatPrecision());
+		globalConfig.setFreqType(config.getFreqType());
+		globalConfig.setGroupSizeInByte(config.getGroupSizeInByte());
+		globalConfig.setHdfsIp(config.getHdfsIp());
+		globalConfig.setHdfsPort(config.getHdfsPort());
+		globalConfig.setHdfsSitePath(config.getHdfsSitePath());
+		globalConfig.setKerberosKeytabFilePath(config.getKerberosKeytabFilePath());
+		globalConfig.setKerberosPrincipal(config.getKerberosPrincipal());
+		globalConfig.setMaxNumberOfPointsInPage(config.getMaxNumberOfPointsInPage());
+		globalConfig.setMaxStringLength(config.getMaxStringLength());
+		globalConfig.setPageCheckSizeThreshold(config.getPageCheckSizeThreshold());
+		globalConfig.setPageSizeInByte(config.getPageSizeInByte());
+		globalConfig.setPlaMaxError(config.getPlaMaxError());
+		globalConfig.setRleBitWidth(config.getRleBitWidth());
+		globalConfig.setSdtMaxError(config.getSdtMaxError());
+		globalConfig.setTimeEncoder(config.getTimeEncoder());
+		globalConfig.setTimeSeriesDataType(config.getTimeSeriesDataType());
+		globalConfig.setTSFileStorageFs(config.getTSFileStorageFs().toString());
+		globalConfig.setUseKerberos(config.isUseKerberos());
+		globalConfig.setValueEncoder(config.getValueEncoder());
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
new file mode 100644
index 0000000..bc65ee2
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tool;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+/**
+ * An example of writing data to TsFile
+ */
+public class TsFileWriteTool {
+
+	public static final String TMP_DIR = "target";
+
+	public static void create1(String tsfilePath) throws Exception {
+		File f = new File(tsfilePath);
+		if (f.exists()) {
+			f.delete();
+		}
+		TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+		// add measurements into file schema
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+		// construct TSRecord
+		TSRecord tsRecord = new TSRecord(1, "device_1");
+		DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+		DataPoint dPoint3;
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+
+		// write a TSRecord to TsFile
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(2, "device_1");
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 50);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(3, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(4, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 51);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(6, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 11);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(7, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(8, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 30);
+		dPoint3 = new IntDataPoint("sensor_3", 31);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(1, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+		dPoint2 = new IntDataPoint("sensor_2", 11);
+		dPoint3 = new IntDataPoint("sensor_3", 19);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(2, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		// close TsFile
+		tsFileWriter.close();
+	}
+
+	public static void create2(String tsfilePath) throws Exception {
+		File f = new File(tsfilePath);
+		if (f.exists()) {
+			f.delete();
+		}
+		TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+		// add measurements into file schema
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		tsFileWriter
+			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+		// construct TSRecord
+		TSRecord tsRecord = new TSRecord(9, "device_1");
+		DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+		DataPoint dPoint3;
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+
+		// write a TSRecord to TsFile
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(10, "device_1");
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 50);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(11, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(12, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 51);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(14, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 11);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(15, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 20);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(16, "device_1");
+		dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+		dPoint2 = new IntDataPoint("sensor_2", 30);
+		dPoint3 = new IntDataPoint("sensor_3", 31);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(9, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+		dPoint2 = new IntDataPoint("sensor_2", 11);
+		dPoint3 = new IntDataPoint("sensor_3", 19);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		tsRecord = new TSRecord(10, "device_2");
+		dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+		dPoint2 = new IntDataPoint("sensor_2", 10);
+		dPoint3 = new IntDataPoint("sensor_3", 21);
+		tsRecord.addTuple(dPoint1);
+		tsRecord.addTuple(dPoint2);
+		tsRecord.addTuple(dPoint3);
+		tsFileWriter.write(tsRecord);
+
+		// close TsFile
+		tsFileWriter.close();
+	}
+}
\ No newline at end of file
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
new file mode 100644
index 0000000..5198152
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * ITCases for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatITCase extends RowTsFileInputFormatTestBase {
+
+	private ExecutionEnvironment env;
+	private StreamExecutionEnvironment senv;
+
+	@Before
+	public void prepareEnv() {
+		env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		senv = StreamExecutionEnvironment.getExecutionEnvironment();
+		senv.setParallelism(1);
+	}
+
+	@Test
+	public void testBatchExecution() throws Exception {
+		// read multiple files
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(null);
+		inputFormat.setFilePaths(sourceTsFilePath1, sourceTsFilePath2);
+		DataSet<Row> source = env.createInput(inputFormat);
+		List<String> result = source.map(Row::toString).collect();
+		Collections.sort(result);
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"10,null,20,50,25.4,10,21",
+			"11,1.4,21,null,null,null,null",
+			"12,1.2,20,51,null,null,null",
+			"14,7.2,10,11,null,null,null",
+			"15,6.2,20,21,null,null,null",
+			"16,9.2,30,31,null,null,null",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null",
+			"9,1.2,20,null,2.3,11,19"
+		};
+		assertArrayEquals(expected, result.toArray());
+	}
+
+	@Test
+	public void testStreamExecution() throws Exception {
+		// read files in a directory
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(tmpDir);
+		DataStream<Row> source = senv.createInput(inputFormat);
+		Iterator<String> rowStringIterator = DataStreamUtils.collect(source.map(Row::toString));
+		String[] result = StreamSupport.stream(
+			Spliterators.spliteratorUnknownSize(rowStringIterator, 0),
+			false).sorted().toArray(String[]::new);
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"10,null,20,50,25.4,10,21",
+			"11,1.4,21,null,null,null,null",
+			"12,1.2,20,51,null,null,null",
+			"14,7.2,10,11,null,null,null",
+			"15,6.2,20,21,null,null,null",
+			"16,9.2,30,31,null,null,null",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null",
+			"9,1.2,20,null,2.3,11,19"
+		};
+		assertArrayEquals(expected, result);
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
new file mode 100644
index 0000000..62f4a9b
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatTest extends RowTsFileInputFormatTestBase {
+
+	@Test
+	public void testReadData() throws IOException {
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+		List<String> actual = new ArrayList<>();
+
+		try {
+			inputFormat.configure(new Configuration());
+			inputFormat.openInputFormat();
+			FileInputSplit[] inputSplits = inputFormat.createInputSplits(2);
+			Row reuse = rowTypeInfo.createSerializer(new ExecutionConfig()).createInstance();
+			for (FileInputSplit inputSplit : inputSplits) {
+				try {
+					inputFormat.open(inputSplit);
+					assertEquals(config.getBatchSize(), TSFileDescriptor.getInstance().getConfig().getBatchSize());
+					while (!inputFormat.reachedEnd()) {
+						Row row = inputFormat.nextRecord(reuse);
+						actual.add(row.toString());
+					}
+				} finally {
+					inputFormat.close();
+				}
+			}
+		} finally {
+			inputFormat.closeInputFormat();
+		}
+
+		String[] expected = {
+			"1,1.2,20,null,2.3,11,19",
+			"2,null,20,50,25.4,10,21",
+			"3,1.4,21,null,null,null,null",
+			"4,1.2,20,51,null,null,null",
+			"6,7.2,10,11,null,null,null",
+			"7,6.2,20,21,null,null,null",
+			"8,9.2,30,31,null,null,null"
+		};
+		assertArrayEquals(actual.toArray(), expected);
+	}
+
+	@Test
+	public void testGetter() {
+		TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+		assertEquals(parser, inputFormat.getParser());
+		assertEquals(queryExpression, inputFormat.getExpression());
+		assertEquals(config, inputFormat.getConfig().get());
+		assertEquals(parser.getProducedType(), inputFormat.getProducedType());
+	}
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
new file mode 100644
index 0000000..0e5f14c
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tool.TsFileWriteTool;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Base class of the TsFileInputFormat tests.
+ */
+public abstract class RowTsFileInputFormatTestBase {
+
+	protected String tmpDir;
+	protected String sourceTsFilePath1;
+	protected String sourceTsFilePath2;
+	protected RowTypeInfo rowTypeInfo;
+	protected TSFileConfig config;
+	protected RowRowRecordParser parser;
+	protected QueryExpression queryExpression;
+
+	@Before
+	public void prepareSourceTsFile() throws Exception {
+		tmpDir = String.join(
+			File.separator,
+			TsFileWriteTool.TMP_DIR,
+			UUID.randomUUID().toString());
+		new File(tmpDir).mkdirs();
+		sourceTsFilePath1 = String.join(
+			File.separator,
+			tmpDir, "source1.tsfile");
+		sourceTsFilePath2 = String.join(
+			File.separator,
+			tmpDir, "source2.tsfile");
+		TsFileWriteTool.create1(sourceTsFilePath1);
+		TsFileWriteTool.create2(sourceTsFilePath2);
+	}
+
+	@After
+	public void removeSourceTsFile() {
+		File sourceTsFile1 = new File(sourceTsFilePath1);
+		if (sourceTsFile1.exists()) {
+			sourceTsFile1.delete();
+		}
+		File sourceTsFile2 = new File(sourceTsFilePath2);
+		if (sourceTsFile2.exists()) {
+			sourceTsFile2.delete();
+		}
+		File tmpDirFile = new File(tmpDir);
+		if (tmpDirFile.exists()) {
+			tmpDirFile.delete();
+		}
+	}
+
+	protected TsFileInputFormat<Row> prepareInputFormat(String filePath) {
+		String[] filedNames = {
+			QueryConstant.RESERVED_TIME,
+			"device_1.sensor_1",
+			"device_1.sensor_2",
+			"device_1.sensor_3",
+			"device_2.sensor_1",
+			"device_2.sensor_2",
+			"device_2.sensor_3"
+		};
+		TypeInformation[] typeInformations = new TypeInformation[] {
+			Types.LONG,
+			Types.FLOAT,
+			Types.INT,
+			Types.INT,
+			Types.FLOAT,
+			Types.INT,
+			Types.INT
+		};
+		List<Path> paths = Arrays.stream(filedNames)
+			.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+			.map(Path::new)
+			.collect(Collectors.toList());
+		config = new TSFileConfig();
+		config.setBatchSize(500);
+		rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+		queryExpression = QueryExpression.create(paths, null);
+		parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+		return new TsFileInputFormat<>(filePath, queryExpression, parser, config);
+	}
+}
diff --git a/pom.xml b/pom.xml
index 1c4c80a..7b5b081 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <module>example</module>
         <module>grafana</module>
         <module>spark-tsfile</module>
+        <module>flink-tsfile</module>
         <module>hadoop</module>
         <module>spark-iotdb-connector</module>
         <module>flink-iotdb-connector</module>
@@ -108,6 +109,7 @@
         <logback.version>1.1.11</logback.version>
         <joda.version>2.9.9</joda.version>
         <spark.version>2.4.3</spark.version>
+        <flink.version>1.10.0</flink.version>
         <common.io.version>2.5</common.io.version>
         <commons.collections4>4.0</commons.collections4>
         <thrift.version>0.13.0</thrift.version>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 9a97c65..c5c8627 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.common.conf;
 
+import java.io.Serializable;
 import java.nio.charset.Charset;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -27,7 +28,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSType;
  * TSFileConfig is a configure class. Every variables is public and has default
  * value.
  */
-public class TSFileConfig {
+public class TSFileConfig implements Serializable {
 
   // Memory configuration
   public static final int RLE_MIN_REPEATED_NUM = 8;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index 4f7feae..93bd706 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.tsfile.read.expression;
 
-public interface IExpression {
+import java.io.Serializable;
+
+public interface IExpression extends Serializable {
 
   ExpressionType getType();
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
index 9d068fc..996d918 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iotdb.tsfile.read.expression;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class QueryExpression {
+public class QueryExpression implements Serializable {
 
   private List<Path> selectedSeries;
   private List<TSDataType> dataTypes;


[incubator-iotdb] 06/06: fix compile error caused by rebase and add a new test to help maintain the TSFileConfigUtil.

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b24cdde30f48f94d1f31c4abc1e73e00c2ceef18
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Wed Apr 8 14:39:25 2020 +0800

    fix compile error caused by rebase and add a new test to help maintain the TSFileConfigUtil.
---
 .../iotdb/flink/tsfile/TsFileInputFormat.java      |  4 +-
 .../iotdb/flink/tsfile/util/TSFileConfigUtil.java  |  2 +-
 .../util/TSFileConfigUtilCompletenessTest.java     | 84 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 3 deletions(-)

diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
index 1ecb4bb..7b94040 100644
--- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
+++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import javax.annotation.Nullable;
@@ -102,7 +102,7 @@ public class TsFileInputFormat<T> extends FileInputFormat<T> implements ResultTy
 					hadoopConf);
 			} else {
 				// Local File System
-				in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().toUri()));
+				in = new LocalTsFileInput(Paths.get(currentSplit.getPath().toUri()));
 			}
 		} catch (URISyntaxException e) {
 			throw new FlinkRuntimeException(e);
diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
index 43b6237..41596ea 100644
--- a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
+++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -58,7 +58,7 @@ public class TSFileConfigUtil {
 		globalConfig.setSdtMaxError(config.getSdtMaxError());
 		globalConfig.setTimeEncoder(config.getTimeEncoder());
 		globalConfig.setTimeSeriesDataType(config.getTimeSeriesDataType());
-		globalConfig.setTSFileStorageFs(config.getTSFileStorageFs().toString());
+		globalConfig.setTSFileStorageFs(config.getTSFileStorageFs());
 		globalConfig.setUseKerberos(config.isUseKerberos());
 		globalConfig.setValueEncoder(config.getValueEncoder());
 	}
diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
new file mode 100644
index 0000000..2af10b6
--- /dev/null
+++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.util;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test is used to help maintain the {@link org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil}.
+ */
+public class TSFileConfigUtilCompletenessTest {
+
+	@Test
+	public void testTSFileConfigUtilCompleteness() {
+		String[] addedSetters = {
+			"setBatchSize",
+			"setBloomFilterErrorRate",
+			"setCompressor",
+			"setCoreSitePath",
+			"setDeltaBlockSize",
+			"setDfsClientFailoverProxyProvider",
+			"setDfsHaAutomaticFailoverEnabled",
+			"setDfsHaNamenodes",
+			"setDfsNameServices",
+			"setDftSatisfyRate",
+			"setEndian",
+			"setFloatPrecision",
+			"setFreqType",
+			"setGroupSizeInByte",
+			"setHdfsIp",
+			"setHdfsPort",
+			"setHdfsSitePath",
+			"setKerberosKeytabFilePath",
+			"setKerberosPrincipal",
+			"setMaxNumberOfPointsInPage",
+			"setMaxStringLength",
+			"setPageCheckSizeThreshold",
+			"setPageSizeInByte",
+			"setPlaMaxError",
+			"setRleBitWidth",
+			"setSdtMaxError",
+			"setTimeEncoder",
+			"setTimeSeriesDataType",
+			"setTSFileStorageFs",
+			"setUseKerberos",
+			"setValueEncoder"
+		};
+		Set<String> newSetters = Arrays.stream(TSFileConfig.class.getMethods())
+			.map(Method::getName)
+			.filter(s -> s.startsWith("set"))
+			.filter(s -> !Arrays.asList(addedSetters).contains(s))
+			.collect(Collectors.toSet());
+		assertTrue(
+			String.format(
+				"New setters in TSFileConfig are detected, please add them to " +
+				"org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil. The setters need to be added: %s",
+				newSetters),
+			newSetters.isEmpty());
+	}
+}


[incubator-iotdb] 02/06: try to fix windows test

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2feb0ce1ae8c54883ba2fb64d99e83897e3e1c01
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Sat Mar 28 15:27:09 2020 +0800

    try to fix windows test
---
 .../src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
index d58d38d..1ecb4bb 100644
--- a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
@@ -102,7 +102,7 @@ public class TsFileInputFormat<T> extends FileInputFormat<T> implements ResultTy
 					hadoopConf);
 			} else {
 				// Local File System
-				in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().getPath()));
+				in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().toUri()));
 			}
 		} catch (URISyntaxException e) {
 			throw new FlinkRuntimeException(e);


[incubator-iotdb] 05/06: fix compile error caused by rebase

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4846b4aa8a4f61713cf3fd45c64ef7356b294e8b
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Wed Apr 1 19:39:32 2020 +0800

    fix compile error caused by rebase
---
 .../java/org/apache/iotdb/flink/TsFlieUtils.java   | 16 ++++++------
 .../apache/iotdb/flink/util/TsFileWriteUtil.java   | 30 ++++++++++------------
 2 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
index ed7b722..5b1ffba 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 
 import java.io.File;
 
@@ -33,21 +34,20 @@ import java.io.File;
  */
 public class TsFlieUtils {
 
+	private static final String DEFAULT_TEMPLATE = "template";
+
 	public static void writeTsFile(String path) {
 		try {
 			File f = FSFactoryProducer.getFSFactory().getFile(path);
 			if (f.exists()) {
 				f.delete();
 			}
-			TsFileWriter tsFileWriter = new TsFileWriter(f);
+			Schema schema = new Schema();
+			schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+			schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+			schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
 
-			// add measurements into file schema
-			tsFileWriter
-				.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
-			tsFileWriter
-				.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
-			tsFileWriter
-				.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
+			TsFileWriter tsFileWriter = new TsFileWriter(f, schema);
 
 			// construct TSRecord
 			for (int i = 0; i < 100; i++) {
diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
index 8dd413c..21d9ab7 100644
--- a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
+++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 
 /**
  * Utils used to prepare TsFiles for testing.
@@ -35,21 +36,19 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public class TsFileWriteUtil {
 
 	public static final String TMP_DIR = "target";
+	private static final String DEFAULT_TEMPLATE = "template";
 
 	public static void create1(String tsfilePath) throws Exception {
 		File f = new File(tsfilePath);
 		if (f.exists()) {
 			f.delete();
 		}
-		TsFileWriter tsFileWriter = new TsFileWriter(f);
+		Schema schema = new Schema();
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
 
-		// add measurements into file schema
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		TsFileWriter tsFileWriter = new TsFileWriter(f, schema);
 
 		// construct TSRecord
 		TSRecord tsRecord = new TSRecord(1, "device_1");
@@ -139,15 +138,12 @@ public class TsFileWriteUtil {
 		if (f.exists()) {
 			f.delete();
 		}
-		TsFileWriter tsFileWriter = new TsFileWriter(f);
-
-		// add measurements into file schema
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
-		tsFileWriter
-			.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		Schema schema = new Schema();
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+		schema.extendTemplate(DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+		TsFileWriter tsFileWriter = new TsFileWriter(f, schema);
 
 		// construct TSRecord
 		TSRecord tsRecord = new TSRecord(9, "device_1");