You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 01:41:52 UTC
[incubator-iotdb] branch master updated: Add TsFile writing to HDFS
example (#994)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d77616 Add TsFile writing to HDFS example (#994)
9d77616 is described below
commit 9d77616c80b861be8a13de06986533eee8b8e8be
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Apr 8 09:41:43 2020 +0800
Add TsFile writing to HDFS example (#994)
* Add TsFile writing to HDFS example
* Rename DefaultTsFileInput to LocalTsFileInput
---
.../iotdb/hadoop/tsfile/TsFileWriteToHDFS.java | 73 ++++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +-
.../iotdb/tsfile/common/conf/TSFileConfig.java | 4 +-
.../fileInputFactory/LocalFSInputFactory.java | 4 +-
.../fileOutputFactory/LocalFSOutputFactory.java | 4 +-
...faultTsFileInput.java => LocalTsFileInput.java} | 4 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 2 +-
.../write/writer/ForceAppendTsFileWriter.java | 2 +-
...ultTsFileOutput.java => LocalTsFileOutput.java} | 11 +---
.../write/writer/RestorableTsFileIOWriter.java | 2 -
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 5 +-
11 files changed, 92 insertions(+), 24 deletions(-)
diff --git a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
new file mode 100644
index 0000000..f44be96
--- /dev/null
+++ b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.hadoop.tsfile;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TsFileWriteToHDFS {
+
+ private static TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ private static final Logger logger = LoggerFactory.getLogger(TsFileWriteToHDFS.class);
+
+ public static void main(String[] args) {
+ config.setTSFileStorageFs(FSType.HDFS);
+
+ String path = "hdfs://localhost:9000/test.tsfile";
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_1),
+ new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_2),
+ new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_3),
+ new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
+
+ // construct TSRecord
+ for (int i = 0; i < 100; i++) {
+ TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_1);
+ DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
+ DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
+ DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+
+ // write TSRecord
+ tsFileWriter.write(tsRecord);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to write TsFile on HDFS. {}", e.getMessage());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d046e11..f40a82d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -340,8 +341,8 @@ public class IoTDBDescriptor {
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance().getConfig()
- .setTSFileStorageFs(
- properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name()));
+ .setTSFileStorageFs(FSType.valueOf(
+ properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
TSFileDescriptor.getInstance().getConfig().setCoreSitePath(
properties.getProperty("core_site_path", conf.getCoreSitePath()));
TSFileDescriptor.getInstance().getConfig().setHdfsSitePath(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 812a223..9a97c65 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -384,8 +384,8 @@ public class TSFileConfig {
return this.TSFileStorageFs;
}
- public void setTSFileStorageFs(String TSFileStorageFs) {
- this.TSFileStorageFs = FSType.valueOf(TSFileStorageFs);
+ public void setTSFileStorageFs(FSType fileStorageFs) {
+ this.TSFileStorageFs = fileStorageFs;
}
public String getCoreSitePath() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
index 271ce51..64121d5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
@@ -25,7 +25,7 @@ import java.nio.file.Paths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
public class LocalFSInputFactory implements FileInputFactory {
@@ -34,7 +34,7 @@ public class LocalFSInputFactory implements FileInputFactory {
public TsFileInput getTsFileInput(String filePath) {
try {
- return new DefaultTsFileInput(Paths.get(filePath));
+ return new LocalTsFileInput(Paths.get(filePath));
} catch (IOException e) {
logger.error("Failed to get TsFile input of file: {}, ", filePath, e);
return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
index 52dfd6d..c21caed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
+import org.apache.iotdb.tsfile.write.writer.LocalTsFileOutput;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
public class LocalFSOutputFactory implements FileOutputFactory {
@@ -34,7 +34,7 @@ public class LocalFSOutputFactory implements FileOutputFactory {
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
try {
- return new DefaultTsFileOutput(new FileOutputStream(filePath, append));
+ return new LocalTsFileOutput(new FileOutputStream(filePath, append));
} catch (IOException e) {
logger.error("Failed to get TsFile output of file: {}, ", filePath, e);
return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
similarity index 95%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index d20a313..6c2e701 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -26,11 +26,11 @@ import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
-public class DefaultTsFileInput implements TsFileInput {
+public class LocalTsFileInput implements TsFileInput {
private FileChannel channel;
- public DefaultTsFileInput(Path file) throws IOException {
+ public LocalTsFileInput(Path file) throws IOException {
channel = FileChannel.open(file, StandardOpenOption.READ);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index da9187b..292e8f8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -146,7 +146,7 @@ public class TsFileWriter implements AutoCloseable {
}
this.pageSize = conf.getPageSizeInByte();
this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
- config.setTSFileStorageFs(conf.getTSFileStorageFs().name());
+ config.setTSFileStorageFs(conf.getTSFileStorageFs());
if (this.pageSize >= chunkGroupSizeThreshold) {
LOG.warn(
"TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 04aee9c..d1b0114 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -43,7 +43,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
if (resourceLogger.isInfoEnabled()) {
resourceLogger.info("{} writer is opened.", file.getName());
}
- this.out = new DefaultTsFileOutput(file, true);
+ this.out = new LocalTsFileOutput(file, true);
this.file = file;
// file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
similarity index 85%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 684b270..1e6e105 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -31,22 +31,17 @@ import java.nio.ByteBuffer;
* existed, it will be created. Otherwise the file will be written from position
* 0.
*/
-public class DefaultTsFileOutput implements TsFileOutput {
+public class LocalTsFileOutput implements TsFileOutput {
private FileOutputStream outputStream;
private BufferedOutputStream bufferedStream;
- DefaultTsFileOutput(File file) throws FileNotFoundException {
- this.outputStream = new FileOutputStream(file);
- this.bufferedStream = new BufferedOutputStream(outputStream);
- }
-
- DefaultTsFileOutput(File file, boolean append) throws FileNotFoundException {
+ LocalTsFileOutput(File file, boolean append) throws FileNotFoundException {
this.outputStream = new FileOutputStream(file, append);
this.bufferedStream = new BufferedOutputStream(outputStream);
}
- public DefaultTsFileOutput(FileOutputStream outputStream) {
+ public LocalTsFileOutput(FileOutputStream outputStream) {
this.outputStream = outputStream;
this.bufferedStream = new BufferedOutputStream(outputStream);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index fcae9cd..ee94550 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -47,8 +47,6 @@ import org.slf4j.LoggerFactory;
*/
public class RestorableTsFileIOWriter extends TsFileIOWriter {
- private static final Logger logger = LoggerFactory
- .getLogger(RestorableTsFileIOWriter.class);
private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
private long truncatedPosition = -1;
private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 658297e..6f0ed79 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -98,7 +99,7 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public TsFileIOWriter(File file) throws IOException {
- this.out = new DefaultTsFileOutput(file);
+ this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
this.file = file;
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} writer is opened.", file.getName());
@@ -229,7 +230,7 @@ public class TsFileIOWriter {
}
Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
-
+
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
tsFileMetaData.setVersionInfo(versionInfo);