You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/13 02:00:14 UTC
[incubator-iotdb] branch master updated: [IOTDB-765] Failed to get
upgrade.txt file when using HDFS storage (#1357)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 051ef4d [IOTDB-765] Failed to get upgrade.txt file when using HDFS storage (#1357)
051ef4d is described below
commit 051ef4dd9cb418e99dc1a37a16443cf418b98180
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sat Jun 13 10:00:04 2020 +0800
[IOTDB-765] Failed to get upgrade.txt file when using HDFS storage (#1357)
---
.../apache/iotdb/db/engine/upgrade/UpgradeLog.java | 5 +-
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 121 ++++++++++-----------
2 files changed, 61 insertions(+), 65 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
index 2c580c0..1da0acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeLog.java
@@ -20,12 +20,12 @@ package org.apache.iotdb.db.engine.upgrade;
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +47,7 @@ public class UpgradeLog {
upgradeLogPath.getParentFile().mkdirs();
}
upgradeLogPath.createNewFile();
- upgradeLogWriter = FSFactoryProducer.getFSFactory()
- .getBufferedWriter(getUpgradeLogPath(), true);
+ upgradeLogWriter = new BufferedWriter(new FileWriter(getUpgradeLogPath(), true));
return true;
} catch (IOException e) {
logger.error("meet error when create upgrade log, file path:{}",
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 9fc72d2..db973b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.tools.upgrade;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -47,14 +45,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.v1.file.metadata.ChunkGroupMetaDataV1;
-import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataIndexV1;
+import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
import org.apache.iotdb.tsfile.v1.file.metadata.TsFileMetadataV1;
import org.apache.iotdb.tsfile.v1.file.utils.HeaderUtils;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -77,7 +74,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
TSDataType.INT64);
private Decoder valueDecoder;
protected String file;
-
+
// PartitionId -> TsFileIOWriter
private Map<Long, TsFileIOWriter> partitionWriterMap;
@@ -101,8 +98,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
*/
public TsFileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
this.file = file;
- final java.nio.file.Path path = Paths.get(file);
- tsFileInput = new LocalTsFileInput(path);
+ tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
partitionWriterMap = new HashMap<>();
try {
if (loadMetadataSize) {
@@ -117,11 +113,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
/**
* upgrade a single tsfile
*
- * @param tsfileName old version tsFile's absolute path
+ * @param tsFileName old version tsFile's absolute path
* @param upgradedResources new version tsFiles' resources
- * @throws WriteProcessException
*/
- public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources)
+ public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources)
throws IOException, WriteProcessException {
try (TsFileOnlineUpgradeTool updater = new TsFileOnlineUpgradeTool(tsFileName)) {
updater.upgradeFile(upgradedResources);
@@ -129,7 +124,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
}
/**
- *
+ *
*/
public void loadMetadataSize() throws IOException {
ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
@@ -255,7 +250,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
0);
return uncompressedBuffer;
}
-
+
public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
return readData(-1, header.getCompressedSize());
}
@@ -320,9 +315,10 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
/**
* upgrade file and resource
- * @throws IOException, WriteProcessException
+ *
+ * @throws IOException, WriteProcessException
*/
- public void upgradeFile(List<TsFileResource> upgradedResources)
+ public void upgradeFile(List<TsFileResource> upgradedResources)
throws IOException, WriteProcessException {
File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
@@ -333,7 +329,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
// ChunkGroupOffset -> version
Map<Long, Long> oldVersionInfo = getVersionInfo();
-
+
// start to scan chunks and chunkGroups
long startOffsetOfChunkGroup = 0;
boolean newChunkGroup = true;
@@ -357,7 +353,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
ChunkHeader header = this.readChunkHeader();
MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
header.getDataType(),
- header.getEncodingType(),
+ header.getEncodingType(),
header.getCompressionType());
measurementSchemaList.add(measurementSchema);
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -367,8 +363,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
PageHeader pageHeader = readPageHeader(header.getDataType());
boolean pageInSamePartition = checkIfPageInSameTimePartition(pageHeader);
pagePartitionInfo.add(pageInSamePartition);
- ByteBuffer pageData = pageInSamePartition ?
- readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+ ByteBuffer pageData = pageInSamePartition ?
+ readCompressedPage(pageHeader)
+ : readPage(pageHeader, header.getCompressionType());
pageHeadersInChunk.add(pageHeader);
dataInChunk.add(pageData);
}
@@ -417,17 +414,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
}
/**
- * This method is for rewriting the ChunkGroup which data is in the different time partitions.
- * In this case, we have to decode the data to points,
- * and then rewrite the data points to different chunkWriters,
- * finally write chunks to their own upgraded TsFiles
- * @param pageHeadersInChunkGroup
- * @throws PageException
+ * This method is for rewriting the ChunkGroup which data is in the different time partitions. In
+ * this case, we have to decode the data to points, and then rewrite the data points to different
+ * chunkWriters, finally write chunks to their own upgraded TsFiles
*/
- private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas,
- List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
- long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup)
- throws IOException, PageException {
+ private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas,
+ List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
+ long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup)
+ throws IOException, PageException {
Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
for (int i = 0; i < schemas.size(); i++) {
MeasurementSchema schema = schemas.get(i);
@@ -438,17 +432,17 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
.getDecoderByType(schema.getEncodingType(), schema.getType());
for (int j = 0; j < pageDataInChunk.size(); j++) {
if (Boolean.TRUE.equals(pagePartitionInfo.get(j))) {
- writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j),
+ writePageInSamePartitionToFile(oldTsFile, schema, pageHeadersInChunk.get(j),
pageDataInChunk.get(j), chunkWritersInChunkGroup);
- }
- else {
- writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j),
+ } else {
+ writePageInDifferentPartitionsToFiles(oldTsFile, schema, pageDataInChunk.get(j),
chunkWritersInChunkGroup);
}
}
}
-
- for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup.entrySet()) {
+
+ for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup
+ .entrySet()) {
long partitionId = entry.getKey();
TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
tsFileIOWriter.startChunkGroup(deviceId);
@@ -462,32 +456,34 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
}
private TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partition) {
- return partitionWriterMap.computeIfAbsent(partition, k ->
- {
- File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
- + File.separator + partition);
- if (!partitionDir.exists()) {
- partitionDir.mkdirs();
- }
- File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
- + File.separator + partition + File.separator+ oldTsFile.getName());
- try {
- if (!newFile.createNewFile()) {
- logger.error("The TsFile {} has been created ", newFile);
+ return partitionWriterMap.computeIfAbsent(partition, k ->
+ {
+ File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+ + File.separator + partition);
+ if (!partitionDir.exists()) {
+ partitionDir.mkdirs();
+ }
+ File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+ + File.separator + partition + File.separator + oldTsFile.getName());
+ try {
+ if (!newFile.createNewFile()) {
+ logger.error("The TsFile {} has been created ", newFile);
+ return null;
+ }
+ return new TsFileIOWriter(newFile);
+ } catch (IOException e) {
+ logger.error("Create new TsFile {} failed ", newFile);
return null;
}
- return new TsFileIOWriter(newFile);
- } catch (IOException e) {
- logger.error("Create new TsFile {} failed ", newFile);
- return null;
}
- }
);
}
- private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema, PageHeader pageHeader,
- ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
- throws PageException {
+ private void writePageInSamePartitionToFile(File oldTsFile, MeasurementSchema schema,
+ PageHeader pageHeader,
+ ByteBuffer pageData,
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ throws PageException {
long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
@@ -500,8 +496,9 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
}
private void writePageInDifferentPartitionsToFiles(File oldTsFile, MeasurementSchema schema,
- ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
- throws IOException {
+ ByteBuffer pageData,
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ throws IOException {
valueDecoder.reset();
PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
defaultTimeDecoder, null);
@@ -510,7 +507,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
long time = batchData.currentTime();
Object value = batchData.currentValue();
long partitionId = StorageEngine.getTimePartition(time);
-
+
Map<MeasurementSchema, ChunkWriterImpl> chunkWriters = chunkWritersInChunkGroup
.getOrDefault(partitionId, new HashMap<>());
ChunkWriterImpl chunkWriter = chunkWriters
@@ -546,9 +543,7 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
}
/**
- * check if the file to be upgraded has correct magic strings and version number
- * @param oldTsFile
- * @throws IOException
+ * check if the file to be upgraded has correct magic strings and version number
*/
private boolean fileCheck(File oldTsFile) throws IOException {
long fileSize;
@@ -602,12 +597,14 @@ public class TsFileOnlineUpgradeTool implements AutoCloseable {
return versionInfo;
}
- private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter) throws IOException {
+ private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
+ throws IOException {
tsFileIOWriter.endFile();
TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile());
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = tsFileIOWriter
.getDeviceTimeseriesMetadataMap();
- for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap
+ .entrySet()) {
String device = entry.getKey();
for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
tsFileResource.updateStartTime(device, timeseriesMetaData.getStatistics().getStartTime());