You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 12:25:43 UTC
[incubator-iotdb] 01/06: add TsFileInputFormat for reading TsFiles
in flink.
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0f782ad90b14ee0481d13e36cf144ead404c615c
Author: 仲炜 <zw...@alibaba-inc.com>
AuthorDate: Thu Mar 26 17:35:54 2020 +0800
add TsFileInputFormat for reading TsFiles in flink.
---
example/flink/README.md | 7 +
example/flink/pom.xml | 10 +
.../apache/iotdb/flink/FlinkTsFileBatchSource.java | 77 +++++++
.../iotdb/flink/FlinkTsFileStreamSource.java | 81 +++++++
.../main/java/org/apache/iotdb/flink/Utils.java | 69 ++++++
flink-tsfile/README.md | 93 ++++++++
{example/flink => flink-tsfile}/pom.xml | 20 +-
.../apache/iotdb/flink/tsfile/RowRecordParser.java | 46 ++++
.../iotdb/flink/tsfile/RowRowRecordParser.java | 120 +++++++++++
.../iotdb/flink/tsfile/TsFileInputFormat.java | 168 +++++++++++++++
.../iotdb/flink/tsfile/util/TSFileConfigUtil.java | 62 ++++++
.../apache/iotdb/flink/tool/TsFileWriteTool.java | 234 +++++++++++++++++++++
.../flink/tsfile/RowTsFileInputFormatITCase.java | 109 ++++++++++
.../flink/tsfile/RowTsFileInputFormatTest.java | 89 ++++++++
.../flink/tsfile/RowTsFileInputFormatTestBase.java | 119 +++++++++++
pom.xml | 2 +
.../iotdb/tsfile/common/conf/TSFileConfig.java | 3 +-
.../iotdb/tsfile/read/expression/IExpression.java | 4 +-
.../tsfile/read/expression/QueryExpression.java | 3 +-
19 files changed, 1308 insertions(+), 8 deletions(-)
diff --git a/example/flink/README.md b/example/flink/README.md
index fa3b867..56b92e4 100644
--- a/example/flink/README.md
+++ b/example/flink/README.md
@@ -28,3 +28,10 @@ The example is to show how to send data to a IoTDB server from a Flink job.
## Usage
* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
+
+# TsFile-Flink-Connector Example
+
+## Usage
+
+* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
+* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index efcd34d..f82d550 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -41,5 +41,15 @@
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>flink-tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
new file mode 100644
index 0000000..04df1df
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileBatchSource {
+
+ public static void main(String[] args) throws Exception {
+ String path = "test.tsfile";
+ Utils.writeTsFile(path);
+ new File(path).deleteOnExit();
+ String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+ };
+ TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG
+ };
+ List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+ QueryExpression queryExpression = QueryExpression.create(paths, null);
+ RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+ TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ inputFormat.setFilePath(path);
+ DataSet<Row> source = env.createInput(inputFormat);
+ List<String> result = source.map(Row::toString).collect();
+ for (String s : result) {
+ System.out.println(s);
+ }
+ }
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
new file mode 100644
index 0000000..3750ea2
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tsfile.RowRowRecordParser;
+import org.apache.iotdb.flink.tsfile.TsFileInputFormat;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTsFileStreamSource {
+
+ public static void main(String[] args) throws IOException {
+ String path = "test.tsfile";
+ Utils.writeTsFile(path);
+ new File(path).deleteOnExit();
+ String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+ };
+ TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG,
+ Types.LONG
+ };
+ List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+ QueryExpression queryExpression = QueryExpression.create(paths, null);
+ RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+ TsFileInputFormat<Row> inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+ inputFormat.setFilePath("source.tsfile");
+ DataStream<Row> source = senv.createInput(inputFormat);
+ DataStream<String> rowString = source.map(Row::toString);
+ Iterator<String> result = DataStreamUtils.collect(rowString);
+ while (result.hasNext()) {
+ System.out.println(result.next());
+ }
+ }
+}
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
new file mode 100644
index 0000000..e30ce40
--- /dev/null
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/Utils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.flink;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+
+public class Utils {
+
+ public static void writeTsFile(String path) {
+ try {
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ if (f.exists()) {
+ f.delete();
+ }
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+ // add measurements into file schema
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
+
+ // construct TSRecord
+ for (int i = 0; i < 100; i++) {
+ TSRecord tsRecord = new TSRecord(i, "device_" + (i % 4));
+ DataPoint dPoint1 = new LongDataPoint("sensor_1", i);
+ DataPoint dPoint2 = new LongDataPoint("sensor_2", i);
+ DataPoint dPoint3 = new LongDataPoint("sensor_3", i);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+
+ // write TSRecord
+ tsFileWriter.write(tsRecord);
+ }
+
+ tsFileWriter.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.out.println(e.getMessage());
+ }
+ }
+}
diff --git a/flink-tsfile/README.md b/flink-tsfile/README.md
new file mode 100644
index 0000000..1f3f28c
--- /dev/null
+++ b/flink-tsfile/README.md
@@ -0,0 +1,93 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# TsFile-Flink-Connector User Guide
+
+## 1. About TsFile-Flink-Connector
+
+TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type.
+This enables users to read, write and query Tsfile by Flink via DataStream/DataSet API.
+
+With this connector, you can
+* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
+* load all files in a specific directory, from either the local file system or hdfs, into Flink
+
+## 2. Quick Start
+
+### TsFileInputFormat Example
+
+1. create TsFileInputFormat with default RowRowRecordParser.
+
+```java
+String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+};
+TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT
+};
+List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+QueryExpression queryExpression = QueryExpression.create(paths, null);
+RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
+```
+
+2. Read data from the input format and print to stdout:
+
+DataStream:
+
+```java
+StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataStream<Row> source = senv.createInput(inputFormat);
+DataStream<String> rowString = source.map(Row::toString);
+Iterator<String> result = DataStreamUtils.collect(rowString);
+while (result.hasNext()) {
+ System.out.println(result.next());
+}
+```
+
+DataSet:
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+inputFormat.setFilePath("source.tsfile");
+DataSet<Row> source = env.createInput(inputFormat);
+List<String> result = source.map(Row::toString).collect();
+for (String s : result) {
+ System.out.println(s);
+}
+```
+
diff --git a/example/flink/pom.xml b/flink-tsfile/pom.xml
similarity index 72%
copy from example/flink/pom.xml
copy to flink-tsfile/pom.xml
index efcd34d..0a66fa2 100644
--- a/example/flink/pom.xml
+++ b/flink-tsfile/pom.xml
@@ -23,23 +23,33 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-examples</artifactId>
+ <artifactId>iotdb-parent</artifactId>
<version>0.10.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-example</artifactId>
- <name>IoTDB-Flink Examples</name>
+ <artifactId>flink-tsfile</artifactId>
<packaging>jar</packaging>
+ <name>IoTDB Flink-TsFile</name>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>flink-iotdb-connector</artifactId>
+ <artifactId>tsfile</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
+ <artifactId>hadoop-tsfile</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
new file mode 100644
index 0000000..72741a9
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRecordParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.io.Serializable;
+
+/**
+ * RowRecordParser parses the RowRecord objects read from TsFile into the user desired format.
+ * If the accurate type information of parse result can not be extracted from the result type class automatically
+ * (e.g. Row, Tuple, etc.), the {@link ResultTypeQueryable} interface needs to be implemented to provide the type
+ * information explicitly.
+ *
+ * @param <T> The type of the parse result.
+ */
+public interface RowRecordParser<T> extends Serializable {
+
+ /**
+ * Parse the row record into type T. The param `reuse` is recommended to use for reducing the creation of new
+ * objects.
+ *
+ * @param rowRecord The input row record.
+ * @param reuse The object could be reused.
+ * @return The parsed result.
+ */
+ T parse(RowRecord rowRecord, T reuse);
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
new file mode 100644
index 0000000..0ea9bce
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/RowRowRecordParser.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The parser that parses the RowRecord objects read from TsFile into Flink Row object.
+ */
+public class RowRowRecordParser implements RowRecordParser<Row>, ResultTypeQueryable<Row> {
+
+ private final int[] indexMapping;
+ private final RowTypeInfo rowTypeInfo;
+
+ public RowRowRecordParser(int[] indexMapping, RowTypeInfo rowTypeInfo) {
+ this.indexMapping = indexMapping;
+ this.rowTypeInfo = rowTypeInfo;
+ }
+
+ @Override
+ public Row parse(RowRecord rowRecord, Row reuse) {
+ List<Field> fields = rowRecord.getFields();
+ for (int i = 0; i < indexMapping.length; i++) {
+ if (indexMapping[i] < 0) {
+ // The negative index is treated as the marker of timestamp.
+ reuse.setField(i, rowRecord.getTimestamp());
+ } else {
+ reuse.setField(i, toSqlValue(fields.get(indexMapping[i])));
+ }
+ }
+ return reuse;
+ }
+
+ private Object toSqlValue(Field field) {
+ if (field == null) {
+ return null;
+ } else if (field.getDataType() == null) {
+ return null;
+ } else {
+ switch (field.getDataType()) {
+ case BOOLEAN:
+ return field.getBoolV();
+ case INT32:
+ return field.getIntV();
+ case INT64:
+ return field.getLongV();
+ case FLOAT:
+ return field.getFloatV();
+ case DOUBLE:
+ return field.getDoubleV();
+ case TEXT:
+ return field.getStringValue();
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported type %s", field.getDataType()));
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return rowTypeInfo;
+ }
+
+ /**
+ * Creates RowRowRecordParser from output RowTypeInfo and selected series in the RowRecord. The row field "time"
+ * will be used to store the timestamp value. The other row fields store the values of the same field names of
+ * the RowRecord.
+ *
+ * @param outputRowTypeInfo The RowTypeInfo of the output row.
+ * @param selectedSeries The selected series in the RowRecord.
+ * @return The RowRowRecordParser.
+ */
+ public static RowRowRecordParser create(RowTypeInfo outputRowTypeInfo, List<Path> selectedSeries) {
+ List<String> selectedSeriesNames = selectedSeries.stream().map(Path::toString).collect(Collectors.toList());
+ String[] rowFieldNames = outputRowTypeInfo.getFieldNames();
+ int[] indexMapping = new int[outputRowTypeInfo.getArity()];
+ for (int i = 0; i < outputRowTypeInfo.getArity(); i++) {
+ if (!QueryConstant.RESERVED_TIME.equals(rowFieldNames[i])) {
+ int index = selectedSeriesNames.indexOf(rowFieldNames[i]);
+ if (index >= 0) {
+ indexMapping[i] = index;
+ } else {
+ throw new IllegalArgumentException(rowFieldNames[i] + " is not found in selected series.");
+ }
+ } else {
+ // marked as timestamp field.
+ indexMapping[i] = -1;
+ }
+ }
+ return new RowRowRecordParser(indexMapping, outputRowTypeInfo);
+ }
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
new file mode 100644
index 0000000..d58d38d
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/TsFileInputFormat.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil;
+import org.apache.iotdb.hadoop.fileSystem.HDFSInput;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+/**
+ * Input format that reads TsFiles. Users need to provide a {@link RowRecordParser} used to parse the raw data read
+ * from TsFiles into the type T.
+ *
+ * @param <T> The output type of this input format.
+ */
+public class TsFileInputFormat<T> extends FileInputFormat<T> implements ResultTypeQueryable<T> {
+
+ private final QueryExpression expression;
+ private final RowRecordParser<T> parser;
+ @Nullable
+ private final TSFileConfig config;
+
+ private transient org.apache.hadoop.conf.Configuration hadoopConf = null;
+ private transient ReadOnlyTsFile readTsFile = null;
+ private transient QueryDataSet queryDataSet = null;
+
+ public TsFileInputFormat(
+ @Nullable String path,
+ QueryExpression expression,
+ RowRecordParser<T> parser,
+ @Nullable TSFileConfig config) {
+ super(path != null ? new Path(path) : null);
+ this.expression = expression;
+ this.parser = parser;
+ this.config = config;
+ }
+
+ public TsFileInputFormat(@Nullable String path, QueryExpression expression, RowRecordParser<T> parser) {
+ this(path, expression, parser, null);
+ }
+
+ public TsFileInputFormat(QueryExpression expression, RowRecordParser<T> parser) {
+ this(null, expression, parser, null);
+ }
+
+ @Override
+ public void configure(Configuration flinkConfiguration) {
+ super.configure(flinkConfiguration);
+ hadoopConf = HadoopUtils.getHadoopConfiguration(flinkConfiguration);
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ if (config != null) {
+ TSFileConfigUtil.setGlobalTSFileConfig(config);
+ }
+ TsFileInput in;
+ try {
+ if (currentSplit.getPath().getFileSystem().isDistributedFS()) {
+ // HDFS
+ in = new HDFSInput(new org.apache.hadoop.fs.Path(new URI(currentSplit.getPath().getPath())),
+ hadoopConf);
+ } else {
+ // Local File System
+ in = new DefaultTsFileInput(Paths.get(currentSplit.getPath().getPath()));
+ }
+ } catch (URISyntaxException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ TsFileSequenceReader reader = new TsFileSequenceReader(in);
+ readTsFile = new ReadOnlyTsFile(reader);
+ queryDataSet = readTsFile.query(
+ // The query method call will change the content of the param query expression,
+ // the original query expression should not be passed to the query method as it may
+ // be used several times.
+ QueryExpression.create(expression.getSelectedSeries(), expression.getExpression()),
+ currentSplit.getStart(),
+ currentSplit.getStart() + currentSplit.getLength());
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (readTsFile != null) {
+ readTsFile.close();
+ readTsFile = null;
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !queryDataSet.hasNext();
+ }
+
+ @Override
+ public T nextRecord(T t) throws IOException {
+ RowRecord rowRecord = queryDataSet.next();
+ return parser.parse(rowRecord, t);
+ }
+
+ @Override
+ public boolean supportsMultiPaths() {
+ return true;
+ }
+
+ public QueryExpression getExpression() {
+ return expression;
+ }
+
+ public RowRecordParser<T> getParser() {
+ return parser;
+ }
+
+ public Optional<TSFileConfig> getConfig() {
+ return Optional.ofNullable(config);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ if (this.getParser() instanceof ResultTypeQueryable) {
+ return ((ResultTypeQueryable) this.getParser()).getProducedType();
+ } else {
+ return TypeExtractor.createTypeInfo(
+ RowRecordParser.class,
+ this.getParser().getClass(), 0, null, null);
+ }
+ }
+}
diff --git a/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
new file mode 100644
index 0000000..5295cf4
--- /dev/null
+++ b/flink-tsfile/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile.util;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+public class TSFileConfigUtil {
+
+ public static void setGlobalTSFileConfig(TSFileConfig config) {
+ TSFileConfig globalConfig = TSFileDescriptor.getInstance().getConfig();
+
+ globalConfig.setBatchSize(config.getBatchSize());
+ globalConfig.setBloomFilterErrorRate(config.getBloomFilterErrorRate());
+ globalConfig.setCompressor(config.getCompressor().toString());
+ globalConfig.setCoreSitePath(config.getCoreSitePath());
+ globalConfig.setDeltaBlockSize(config.getDeltaBlockSize());
+ globalConfig.setDfsClientFailoverProxyProvider(config.getDfsClientFailoverProxyProvider());
+ globalConfig.setDfsHaAutomaticFailoverEnabled(config.isDfsHaAutomaticFailoverEnabled());
+ globalConfig.setDfsHaNamenodes(config.getDfsHaNamenodes());
+ globalConfig.setDfsNameServices(config.getDfsNameServices());
+ globalConfig.setDftSatisfyRate(config.getDftSatisfyRate());
+ globalConfig.setEndian(config.getEndian());
+ globalConfig.setFloatPrecision(config.getFloatPrecision());
+ globalConfig.setFreqType(config.getFreqType());
+ globalConfig.setGroupSizeInByte(config.getGroupSizeInByte());
+ globalConfig.setHdfsIp(config.getHdfsIp());
+ globalConfig.setHdfsPort(config.getHdfsPort());
+ globalConfig.setHdfsSitePath(config.getHdfsSitePath());
+ globalConfig.setKerberosKeytabFilePath(config.getKerberosKeytabFilePath());
+ globalConfig.setKerberosPrincipal(config.getKerberosPrincipal());
+ globalConfig.setMaxNumberOfPointsInPage(config.getMaxNumberOfPointsInPage());
+ globalConfig.setMaxStringLength(config.getMaxStringLength());
+ globalConfig.setPageCheckSizeThreshold(config.getPageCheckSizeThreshold());
+ globalConfig.setPageSizeInByte(config.getPageSizeInByte());
+ globalConfig.setPlaMaxError(config.getPlaMaxError());
+ globalConfig.setRleBitWidth(config.getRleBitWidth());
+ globalConfig.setSdtMaxError(config.getSdtMaxError());
+ globalConfig.setTimeEncoder(config.getTimeEncoder());
+ globalConfig.setTimeSeriesDataType(config.getTimeSeriesDataType());
+ globalConfig.setTSFileStorageFs(config.getTSFileStorageFs().toString());
+ globalConfig.setUseKerberos(config.isUseKerberos());
+ globalConfig.setValueEncoder(config.getValueEncoder());
+ }
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
new file mode 100644
index 0000000..bc65ee2
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tool/TsFileWriteTool.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tool;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+/**
+ * An example of writing data to TsFile
+ */
+public class TsFileWriteTool {
+
+ public static final String TMP_DIR = "target";
+
+ public static void create1(String tsfilePath) throws Exception {
+ File f = new File(tsfilePath);
+ if (f.exists()) {
+ f.delete();
+ }
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+ // add measurements into file schema
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+ // construct TSRecord
+ TSRecord tsRecord = new TSRecord(1, "device_1");
+ DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+ DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+ DataPoint dPoint3;
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+
+ // write a TSRecord to TsFile
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(2, "device_1");
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 50);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(3, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+ dPoint2 = new IntDataPoint("sensor_2", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(4, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 51);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(6, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 10);
+ dPoint3 = new IntDataPoint("sensor_3", 11);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(7, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(8, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 30);
+ dPoint3 = new IntDataPoint("sensor_3", 31);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(1, "device_2");
+ dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+ dPoint2 = new IntDataPoint("sensor_2", 11);
+ dPoint3 = new IntDataPoint("sensor_3", 19);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(2, "device_2");
+ dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+ dPoint2 = new IntDataPoint("sensor_2", 10);
+ dPoint3 = new IntDataPoint("sensor_3", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ // close TsFile
+ tsFileWriter.close();
+ }
+
+ public static void create2(String tsfilePath) throws Exception {
+ File f = new File(tsfilePath);
+ if (f.exists()) {
+ f.delete();
+ }
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+
+ // add measurements into file schema
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
+ tsFileWriter
+ .addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
+
+ // construct TSRecord
+ TSRecord tsRecord = new TSRecord(9, "device_1");
+ DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+ DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
+ DataPoint dPoint3;
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+
+ // write a TSRecord to TsFile
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(10, "device_1");
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 50);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(11, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
+ dPoint2 = new IntDataPoint("sensor_2", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(12, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 51);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(14, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 10);
+ dPoint3 = new IntDataPoint("sensor_3", 11);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(15, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 20);
+ dPoint3 = new IntDataPoint("sensor_3", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(16, "device_1");
+ dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
+ dPoint2 = new IntDataPoint("sensor_2", 30);
+ dPoint3 = new IntDataPoint("sensor_3", 31);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(9, "device_2");
+ dPoint1 = new FloatDataPoint("sensor_1", 2.3f);
+ dPoint2 = new IntDataPoint("sensor_2", 11);
+ dPoint3 = new IntDataPoint("sensor_3", 19);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ tsRecord = new TSRecord(10, "device_2");
+ dPoint1 = new FloatDataPoint("sensor_1", 25.4f);
+ dPoint2 = new IntDataPoint("sensor_2", 10);
+ dPoint3 = new IntDataPoint("sensor_3", 21);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+ tsFileWriter.write(tsRecord);
+
+ // close TsFile
+ tsFileWriter.close();
+ }
+}
\ No newline at end of file
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
new file mode 100644
index 0000000..5198152
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * ITCases for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatITCase extends RowTsFileInputFormatTestBase {
+
+ private ExecutionEnvironment env;
+ private StreamExecutionEnvironment senv;
+
+ @Before
+ public void prepareEnv() {
+ env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ senv = StreamExecutionEnvironment.getExecutionEnvironment();
+ senv.setParallelism(1);
+ }
+
+ @Test
+ public void testBatchExecution() throws Exception {
+ // read multiple files
+ TsFileInputFormat<Row> inputFormat = prepareInputFormat(null);
+ inputFormat.setFilePaths(sourceTsFilePath1, sourceTsFilePath2);
+ DataSet<Row> source = env.createInput(inputFormat);
+ List<String> result = source.map(Row::toString).collect();
+ Collections.sort(result);
+ String[] expected = {
+ "1,1.2,20,null,2.3,11,19",
+ "10,null,20,50,25.4,10,21",
+ "11,1.4,21,null,null,null,null",
+ "12,1.2,20,51,null,null,null",
+ "14,7.2,10,11,null,null,null",
+ "15,6.2,20,21,null,null,null",
+ "16,9.2,30,31,null,null,null",
+ "2,null,20,50,25.4,10,21",
+ "3,1.4,21,null,null,null,null",
+ "4,1.2,20,51,null,null,null",
+ "6,7.2,10,11,null,null,null",
+ "7,6.2,20,21,null,null,null",
+ "8,9.2,30,31,null,null,null",
+ "9,1.2,20,null,2.3,11,19"
+ };
+ assertArrayEquals(expected, result.toArray());
+ }
+
+ @Test
+ public void testStreamExecution() throws Exception {
+ // read files in a directory
+ TsFileInputFormat<Row> inputFormat = prepareInputFormat(tmpDir);
+ DataStream<Row> source = senv.createInput(inputFormat);
+ Iterator<String> rowStringIterator = DataStreamUtils.collect(source.map(Row::toString));
+ String[] result = StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(rowStringIterator, 0),
+ false).sorted().toArray(String[]::new);
+ String[] expected = {
+ "1,1.2,20,null,2.3,11,19",
+ "10,null,20,50,25.4,10,21",
+ "11,1.4,21,null,null,null,null",
+ "12,1.2,20,51,null,null,null",
+ "14,7.2,10,11,null,null,null",
+ "15,6.2,20,21,null,null,null",
+ "16,9.2,30,31,null,null,null",
+ "2,null,20,50,25.4,10,21",
+ "3,1.4,21,null,null,null,null",
+ "4,1.2,20,51,null,null,null",
+ "6,7.2,10,11,null,null,null",
+ "7,6.2,20,21,null,null,null",
+ "8,9.2,30,31,null,null,null",
+ "9,1.2,20,null,2.3,11,19"
+ };
+ assertArrayEquals(expected, result);
+ }
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
new file mode 100644
index 0000000..62f4a9b
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Row;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for RowTsFileInputFormat.
+ */
+public class RowTsFileInputFormatTest extends RowTsFileInputFormatTestBase {
+
+ @Test
+ public void testReadData() throws IOException {
+ TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+ List<String> actual = new ArrayList<>();
+
+ try {
+ inputFormat.configure(new Configuration());
+ inputFormat.openInputFormat();
+ FileInputSplit[] inputSplits = inputFormat.createInputSplits(2);
+ Row reuse = rowTypeInfo.createSerializer(new ExecutionConfig()).createInstance();
+ for (FileInputSplit inputSplit : inputSplits) {
+ try {
+ inputFormat.open(inputSplit);
+ assertEquals(config.getBatchSize(), TSFileDescriptor.getInstance().getConfig().getBatchSize());
+ while (!inputFormat.reachedEnd()) {
+ Row row = inputFormat.nextRecord(reuse);
+ actual.add(row.toString());
+ }
+ } finally {
+ inputFormat.close();
+ }
+ }
+ } finally {
+ inputFormat.closeInputFormat();
+ }
+
+ String[] expected = {
+ "1,1.2,20,null,2.3,11,19",
+ "2,null,20,50,25.4,10,21",
+ "3,1.4,21,null,null,null,null",
+ "4,1.2,20,51,null,null,null",
+ "6,7.2,10,11,null,null,null",
+ "7,6.2,20,21,null,null,null",
+ "8,9.2,30,31,null,null,null"
+ };
+ assertArrayEquals(actual.toArray(), expected);
+ }
+
+ @Test
+ public void testGetter() {
+ TsFileInputFormat<Row> inputFormat = prepareInputFormat(sourceTsFilePath1);
+
+ assertEquals(parser, inputFormat.getParser());
+ assertEquals(queryExpression, inputFormat.getExpression());
+ assertEquals(config, inputFormat.getConfig().get());
+ assertEquals(parser.getProducedType(), inputFormat.getProducedType());
+ }
+}
diff --git a/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
new file mode 100644
index 0000000..0e5f14c
--- /dev/null
+++ b/flink-tsfile/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flink.tsfile;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.flink.types.Row;
+import org.apache.iotdb.flink.tool.TsFileWriteTool;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.constant.QueryConstant;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Base class of the TsFileInputFormat tests.
+ */
+public abstract class RowTsFileInputFormatTestBase {
+
+ protected String tmpDir;
+ protected String sourceTsFilePath1;
+ protected String sourceTsFilePath2;
+ protected RowTypeInfo rowTypeInfo;
+ protected TSFileConfig config;
+ protected RowRowRecordParser parser;
+ protected QueryExpression queryExpression;
+
+ @Before
+ public void prepareSourceTsFile() throws Exception {
+ tmpDir = String.join(
+ File.separator,
+ TsFileWriteTool.TMP_DIR,
+ UUID.randomUUID().toString());
+ new File(tmpDir).mkdirs();
+ sourceTsFilePath1 = String.join(
+ File.separator,
+ tmpDir, "source1.tsfile");
+ sourceTsFilePath2 = String.join(
+ File.separator,
+ tmpDir, "source2.tsfile");
+ TsFileWriteTool.create1(sourceTsFilePath1);
+ TsFileWriteTool.create2(sourceTsFilePath2);
+ }
+
+ @After
+ public void removeSourceTsFile() {
+ File sourceTsFile1 = new File(sourceTsFilePath1);
+ if (sourceTsFile1.exists()) {
+ sourceTsFile1.delete();
+ }
+ File sourceTsFile2 = new File(sourceTsFilePath2);
+ if (sourceTsFile2.exists()) {
+ sourceTsFile2.delete();
+ }
+ File tmpDirFile = new File(tmpDir);
+ if (tmpDirFile.exists()) {
+ tmpDirFile.delete();
+ }
+ }
+
+ protected TsFileInputFormat<Row> prepareInputFormat(String filePath) {
+ String[] filedNames = {
+ QueryConstant.RESERVED_TIME,
+ "device_1.sensor_1",
+ "device_1.sensor_2",
+ "device_1.sensor_3",
+ "device_2.sensor_1",
+ "device_2.sensor_2",
+ "device_2.sensor_3"
+ };
+ TypeInformation[] typeInformations = new TypeInformation[] {
+ Types.LONG,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT,
+ Types.FLOAT,
+ Types.INT,
+ Types.INT
+ };
+ List<Path> paths = Arrays.stream(filedNames)
+ .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
+ .map(Path::new)
+ .collect(Collectors.toList());
+ config = new TSFileConfig();
+ config.setBatchSize(500);
+ rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
+ queryExpression = QueryExpression.create(paths, null);
+ parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
+ return new TsFileInputFormat<>(filePath, queryExpression, parser, config);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 1c4c80a..7b5b081 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<module>example</module>
<module>grafana</module>
<module>spark-tsfile</module>
+ <module>flink-tsfile</module>
<module>hadoop</module>
<module>spark-iotdb-connector</module>
<module>flink-iotdb-connector</module>
@@ -108,6 +109,7 @@
<logback.version>1.1.11</logback.version>
<joda.version>2.9.9</joda.version>
<spark.version>2.4.3</spark.version>
+ <flink.version>1.10.0</flink.version>
<common.io.version>2.5</common.io.version>
<commons.collections4>4.0</commons.collections4>
<thrift.version>0.13.0</thrift.version>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 9a97c65..c5c8627 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.common.conf;
+import java.io.Serializable;
import java.nio.charset.Charset;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -27,7 +28,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSType;
* TSFileConfig is a configure class. Every variables is public and has default
* value.
*/
-public class TSFileConfig {
+public class TSFileConfig implements Serializable {
// Memory configuration
public static final int RLE_MIN_REPEATED_NUM = 8;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
index 4f7feae..93bd706 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/IExpression.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.tsfile.read.expression;
-public interface IExpression {
+import java.io.Serializable;
+
+public interface IExpression extends Serializable {
ExpressionType getType();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
index 9d068fc..996d918 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/expression/QueryExpression.java
@@ -18,12 +18,13 @@
*/
package org.apache.iotdb.tsfile.read.expression;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
-public class QueryExpression {
+public class QueryExpression implements Serializable {
private List<Path> selectedSeries;
private List<TSDataType> dataTypes;