You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/09 05:37:56 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][ISSUE-3378] Fix NPE
when clear upgrade folder;
Fix some upgraded pageHeader missing statistics (#3376)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 33ee473 [To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
33ee473 is described below
commit 33ee473d1f2476180b4b6207fc1d25ef73205a43
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Jun 9 13:37:17 2021 +0800
[To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
---
.../engine/storagegroup/StorageGroupProcessor.java | 3 +++
.../iotdb/db/engine/upgrade/UpgradeTask.java | 7 ++++-
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 12 +++------
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 31 ++++++++++++++++------
4 files changed, 35 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b2d4b60..ff626d6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2101,6 +2101,7 @@ public class StorageGroupProcessor {
return;
}
for (TsFileResource resource : resources) {
+ resource.writeLock();
try {
UpgradeUtils.moveUpgradedFiles(resource);
tsFileManagement.addAll(resource.getUpgradedResources(), isseq);
@@ -2114,6 +2115,8 @@ public class StorageGroupProcessor {
resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS);
} catch (IOException e) {
logger.error("Unable to load {}, caused by ", resource, e);
+ } finally {
+ resource.writeUnlock();
}
}
// delete upgrade folder when it is empty
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index 76b1bac..bcf29d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -139,8 +139,13 @@ public class UpgradeTask extends WrappedRunnable {
}
File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
-
+ if (upgradeDir == null) {
+ continue;
+ }
File[] tmpPartitionDirList = upgradeDir.listFiles();
+ if (tmpPartitionDirList == null) {
+ continue;
+ }
for (File tmpPartitionDir : tmpPartitionDirList) {
if (tmpPartitionDir.isDirectory()) {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 955ed77..e1e4bc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -319,17 +319,12 @@ public class TsFileRewriteTool implements AutoCloseable {
List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
- boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1;
for (int j = 0; j < pageDataInChunk.size(); j++) {
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
} else {
writePageInToFile(
- schema,
- pageHeadersInChunk.get(j),
- pageDataInChunk.get(j),
- chunkWritersInChunkGroup,
- isOnlyOnePageChunk);
+ schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
}
}
}
@@ -389,15 +384,14 @@ public class TsFileRewriteTool implements AutoCloseable {
MeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup,
- boolean isOnlyOnePageChunk)
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
throws PageException {
long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
Map<MeasurementSchema, ChunkWriterImpl> chunkWriters =
chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
- chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk);
+ chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
chunkWriters.put(schema, chunkWriter);
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 39e5f79..48ff92c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -294,8 +294,7 @@ public class ChunkWriterImpl implements IChunkWriter {
if (numOfPages == 0) { // record the firstPageStatistics
this.firstPageStatistics = pageWriter.getStatistics();
this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
- } else if (numOfPages == 1
- && firstPageStatistics != null) { // put the firstPageStatistics into pageBuffer
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
@@ -374,16 +373,32 @@ public class ChunkWriterImpl implements IChunkWriter {
* write the page header and data into the PageWriter's output stream. @NOTE: for upgrading
* 0.11/v2 to 0.12/v3 TsFile
*/
- public void writePageHeaderAndDataIntoBuff(
- ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws PageException {
-
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
// write the page header to pageBuffer
try {
logger.debug(
"start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
- ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
- ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
- if (!isOnlyOnePageChunk) {
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = header.getStatistics();
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
logger.debug(