You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 01:41:52 UTC

[incubator-iotdb] branch master updated: Add TsFile writing to HDFS example (#994)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d77616  Add TsFile writing to HDFS example (#994)
9d77616 is described below

commit 9d77616c80b861be8a13de06986533eee8b8e8be
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Apr 8 09:41:43 2020 +0800

    Add TsFile writing to HDFS example (#994)
    
    * Add TsFile writing to HDFS example
    * Rename DefaultTsFileInput to LocalTsFileInput
---
 .../iotdb/hadoop/tsfile/TsFileWriteToHDFS.java     | 73 ++++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 +-
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |  4 +-
 .../fileInputFactory/LocalFSInputFactory.java      |  4 +-
 .../fileOutputFactory/LocalFSOutputFactory.java    |  4 +-
 ...faultTsFileInput.java => LocalTsFileInput.java} |  4 +-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  2 +-
 .../write/writer/ForceAppendTsFileWriter.java      |  2 +-
 ...ultTsFileOutput.java => LocalTsFileOutput.java} | 11 +---
 .../write/writer/RestorableTsFileIOWriter.java     |  2 -
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  5 +-
 11 files changed, 92 insertions(+), 24 deletions(-)

diff --git a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
new file mode 100644
index 0000000..f44be96
--- /dev/null
+++ b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.hadoop.tsfile;
+
+import java.io.File;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TsFileWriteToHDFS {
+
+  private static TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+  private static final Logger logger = LoggerFactory.getLogger(TsFileWriteToHDFS.class);
+
+  public static void main(String[] args) {
+    config.setTSFileStorageFs(FSType.HDFS);
+
+    String path = "hdfs://localhost:9000/test.tsfile";
+    File f = FSFactoryProducer.getFSFactory().getFile(path);
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_1),
+          new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_2),
+          new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
+      tsFileWriter.registerTimeseries(new Path(Constant.DEVICE_1, Constant.SENSOR_3),
+          new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
+
+      // construct TSRecord
+      for (int i = 0; i < 100; i++) {
+        TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_1);
+        DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
+        DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
+        DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
+        tsRecord.addTuple(dPoint1);
+        tsRecord.addTuple(dPoint2);
+        tsRecord.addTuple(dPoint3);
+
+        // write TSRecord
+        tsFileWriter.write(tsRecord);
+      }
+    } catch (Exception e) {
+      logger.error("Failed to write TsFile on HDFS. {}", e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d046e11..f40a82d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -340,8 +341,8 @@ public class IoTDBDescriptor {
 
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance().getConfig()
-          .setTSFileStorageFs(
-              properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name()));
+          .setTSFileStorageFs(FSType.valueOf(
+              properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
       TSFileDescriptor.getInstance().getConfig().setCoreSitePath(
           properties.getProperty("core_site_path", conf.getCoreSitePath()));
       TSFileDescriptor.getInstance().getConfig().setHdfsSitePath(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 812a223..9a97c65 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -384,8 +384,8 @@ public class TSFileConfig {
     return this.TSFileStorageFs;
   }
 
-  public void setTSFileStorageFs(String TSFileStorageFs) {
-    this.TSFileStorageFs = FSType.valueOf(TSFileStorageFs);
+  public void setTSFileStorageFs(FSType fileStorageFs) {
+    this.TSFileStorageFs = fileStorageFs;
   }
 
   public String getCoreSitePath() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
index 271ce51..64121d5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/LocalFSInputFactory.java
@@ -25,7 +25,7 @@ import java.nio.file.Paths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 public class LocalFSInputFactory implements FileInputFactory {
@@ -34,7 +34,7 @@ public class LocalFSInputFactory implements FileInputFactory {
 
   public TsFileInput getTsFileInput(String filePath) {
     try {
-      return new DefaultTsFileInput(Paths.get(filePath));
+      return new LocalTsFileInput(Paths.get(filePath));
     } catch (IOException e) {
       logger.error("Failed to get TsFile input of file: {}, ", filePath, e);
       return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
index 52dfd6d..c21caed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.iotdb.tsfile.write.writer.DefaultTsFileOutput;
+import org.apache.iotdb.tsfile.write.writer.LocalTsFileOutput;
 import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
 
 public class LocalFSOutputFactory implements FileOutputFactory {
@@ -34,7 +34,7 @@ public class LocalFSOutputFactory implements FileOutputFactory {
 
   public TsFileOutput getTsFileOutput(String filePath, boolean append) {
     try {
-      return new DefaultTsFileOutput(new FileOutputStream(filePath, append));
+      return new LocalTsFileOutput(new FileOutputStream(filePath, append));
     } catch (IOException e) {
       logger.error("Failed to get TsFile output of file: {}, ", filePath, e);
       return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
similarity index 95%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index d20a313..6c2e701 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -26,11 +26,11 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 
-public class DefaultTsFileInput implements TsFileInput {
+public class LocalTsFileInput implements TsFileInput {
 
   private FileChannel channel;
 
-  public DefaultTsFileInput(Path file) throws IOException {
+  public LocalTsFileInput(Path file) throws IOException {
     channel = FileChannel.open(file, StandardOpenOption.READ);
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index da9187b..292e8f8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -146,7 +146,7 @@ public class TsFileWriter implements AutoCloseable {
     }
     this.pageSize = conf.getPageSizeInByte();
     this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
-    config.setTSFileStorageFs(conf.getTSFileStorageFs().name());
+    config.setTSFileStorageFs(conf.getTSFileStorageFs());
     if (this.pageSize >= chunkGroupSizeThreshold) {
       LOG.warn(
           "TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 04aee9c..d1b0114 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -43,7 +43,7 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter {
     if (resourceLogger.isInfoEnabled()) {
       resourceLogger.info("{} writer is opened.", file.getName());
     }
-    this.out = new DefaultTsFileOutput(file, true);
+    this.out = new LocalTsFileOutput(file, true);
     this.file = file;
 
     // file doesn't exist
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
similarity index 85%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
index 684b270..1e6e105 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/DefaultTsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/LocalTsFileOutput.java
@@ -31,22 +31,17 @@ import java.nio.ByteBuffer;
  * existed, it will be created. Otherwise the file will be written from position
  * 0.
  */
-public class DefaultTsFileOutput implements TsFileOutput {
+public class LocalTsFileOutput implements TsFileOutput {
 
   private FileOutputStream outputStream;
   private BufferedOutputStream bufferedStream;
 
-  DefaultTsFileOutput(File file) throws FileNotFoundException {
-    this.outputStream = new FileOutputStream(file);
-    this.bufferedStream = new BufferedOutputStream(outputStream);
-  }
-
-  DefaultTsFileOutput(File file, boolean append) throws FileNotFoundException {
+  LocalTsFileOutput(File file, boolean append) throws FileNotFoundException {
     this.outputStream = new FileOutputStream(file, append);
     this.bufferedStream = new BufferedOutputStream(outputStream);
   }
 
-  public DefaultTsFileOutput(FileOutputStream outputStream) {
+  public LocalTsFileOutput(FileOutputStream outputStream) {
     this.outputStream = outputStream;
     this.bufferedStream = new BufferedOutputStream(outputStream);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index fcae9cd..ee94550 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -47,8 +47,6 @@ import org.slf4j.LoggerFactory;
  */
 public class RestorableTsFileIOWriter extends TsFileIOWriter {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(RestorableTsFileIOWriter.class);
   private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
   private long truncatedPosition = -1;
   private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 658297e..6f0ed79 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -98,7 +99,7 @@ public class TsFileIOWriter {
    * @throws IOException if I/O error occurs
    */
   public TsFileIOWriter(File file) throws IOException {
-    this.out = new DefaultTsFileOutput(file);
+    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
     this.file = file;
     if (resourceLogger.isDebugEnabled()) {
       resourceLogger.debug("{} writer is opened.", file.getName());
@@ -229,7 +230,7 @@ public class TsFileIOWriter {
     }
 
     Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
-    
+
     TsFileMetadata tsFileMetaData = new TsFileMetadata();
     tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
     tsFileMetaData.setVersionInfo(versionInfo);