You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/04/07 01:41:28 UTC
[incubator-iotdb] 01/01: Add TsFile writing to HDFS example
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch tsfile_hdfs_example
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 920fd7b78e5fcdae3afa7459839aeef6cbb03420
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Apr 7 09:40:55 2020 +0800
Add TsFile writing to HDFS example
---
.../iotdb/hadoop/tsfile/TsFileWriteToHDFS.java | 77 ++++++++++++++++++++++
.../fileOutputFactory/LocalFSOutputFactory.java | 4 +-
.../write/writer/ForceAppendTsFileWriter.java | 2 +-
...ultTsFileOutput.java => LocalTsFileOutput.java} | 11 +---
.../write/writer/RestorableTsFileIOWriter.java | 2 -
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 5 +-
6 files changed, 86 insertions(+), 15 deletions(-)
diff --git a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
new file mode 100644
index 0000000..bdd6fd9
--- /dev/null
+++ b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.hadoop.tsfile;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+public class TsFileWriteToHDFS {
+
+ private static TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+
+
+ public static void main(String[] args) {
+ config.setTSFileStorageFs("HDFS");
+
+ try {
+ String path = "hdfs://localhost:9000/test.tsfile";
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ if (!f.exists()) {
+ f.createNewFile();
+ }
+ TsFileWriter tsFileWriter = new TsFileWriter(f);
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_1),
+ new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_2),
+ new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
+ tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_3),
+ new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
+
+ // construct TSRecord
+ for (int i = 0; i < 100; i++) {
+ TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_1);
+ DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
+ DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
+ DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
+ tsRecord.addTuple(dPoint1);
+ tsRecord.addTuple(dPoint2);
+ tsRecord.addTuple(dPoint3);
+
+ // write TSRecord
+ tsFileWriter.write(tsRecord);
+ }
+
+ tsFileWriter.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.out.println(e.getMessage());
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
index 52dfd6d..c21caed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
+import org.apache.iotdb.tsfile.write.writer.LocalTsFileOutput;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
public class LocalFSOutputFactory implements FileOutputFactory {
@@ -34,7 +34,7 @@ public class LocalFSOutputFactory implements FileOutputFactory {
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
try {
- return new DefaultTsFileOutput(new FileOutputStream(filePath, append));
+ return new LocalTsFileOutput(new FileOutputStream(filePath, append));
} catch (IOException e) {
logger.error("Failed to get TsFile output of file: {}, ", filePath, e);
return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 04aee9c..d1b0114 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -43,7 +43,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
if (resourceLogger.isInfoEnabled()) {
resourceLogger.info("{} writer is opened.", file.getName());
}
- this.out = new DefaultTsFileOutput(file, true);
+ this.out = new LocalTsFileOutput(file, true);
this.file = file;
// file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
similarity index 85%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 684b270..1e6e105 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -31,22 +31,17 @@ import java.nio.ByteBuffer;
* existed, it will be created. Otherwise the file will be written from position
* 0.
*/
-public class DefaultTsFileOutput implements TsFileOutput {
+public class LocalTsFileOutput implements TsFileOutput {
private FileOutputStream outputStream;
private BufferedOutputStream bufferedStream;
- DefaultTsFileOutput(File file) throws FileNotFoundException {
- this.outputStream = new FileOutputStream(file);
- this.bufferedStream = new BufferedOutputStream(outputStream);
- }
-
- DefaultTsFileOutput(File file, boolean append) throws FileNotFoundException {
+ LocalTsFileOutput(File file, boolean append) throws FileNotFoundException {
this.outputStream = new FileOutputStream(file, append);
this.bufferedStream = new BufferedOutputStream(outputStream);
}
- public DefaultTsFileOutput(FileOutputStream outputStream) {
+ public LocalTsFileOutput(FileOutputStream outputStream) {
this.outputStream = outputStream;
this.bufferedStream = new BufferedOutputStream(outputStream);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index fcae9cd..ee94550 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -47,8 +47,6 @@ import org.slf4j.LoggerFactory;
*/
public class RestorableTsFileIOWriter extends TsFileIOWriter {
- private static final Logger logger = LoggerFactory
- .getLogger(RestorableTsFileIOWriter.class);
private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
private long truncatedPosition = -1;
private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index a8146c8..d8d9ad2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -98,7 +99,7 @@ public class TsFileIOWriter {
* @throws IOException if I/O error occurs
*/
public TsFileIOWriter(File file) throws IOException {
- this.out = new DefaultTsFileOutput(file);
+ this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
this.file = file;
if (resourceLogger.isInfoEnabled()) {
resourceLogger.info("{} writer is opened.", file.getName());
@@ -234,7 +235,7 @@ public class TsFileIOWriter {
}
Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
-
+
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
tsFileMetaData.setVersionInfo(versionInfo);