You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/06 05:06:06 UTC
[iotdb] branch master updated: Cherry-pick 4 bug-fixing commits
about upgrade tool from rel/0.12 to master branch (#3698)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3234abf Cherry-pick 4 bug-fixing commits about upgrade tool from rel/0.12 to master branch (#3698)
3234abf is described below
commit 3234abf97f1f7900eb7da2a3d68c1f5ceeb91bde
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri Aug 6 13:05:45 2021 +0800
Cherry-pick 4 bug-fixing commits about upgrade tool from rel/0.12 to master branch (#3698)
* Fix upgrade tool cannot close file reader (#3319)
* [To rel/0.12] Fix removing tmp folders logic in upgrade tool (#3186)
* [To rel/0.12] Fix upgrade NPE and DeadLock (#3329)
* Fix upgrade NPE and DeadLock
* Fix get ungrade file num deadlock
* [ISSUE-3378] Fix NPE when clear upgrade folder; Fix some upgraded pageHeader missing statistics (#3376)
---
.../engine/storagegroup/StorageGroupProcessor.java | 3 ++
.../iotdb/db/engine/upgrade/UpgradeTask.java | 60 ++++++++++++++++++----
.../org/apache/iotdb/db/service/UpgradeSevice.java | 24 ++-------
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 30 +++++++----
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 4 +-
.../org/apache/iotdb/db/utils/UpgradeUtils.java | 10 ----
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 28 +++++++---
7 files changed, 102 insertions(+), 57 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index af54491..77d68b1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2180,6 +2180,7 @@ public class StorageGroupProcessor {
return;
}
for (TsFileResource resource : resources) {
+ resource.writeLock();
try {
UpgradeUtils.moveUpgradedFiles(resource);
tsFileManagement.addAll(resource.getUpgradedResources(), isseq);
@@ -2193,6 +2194,8 @@ public class StorageGroupProcessor {
resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS);
} catch (IOException e) {
logger.error("Unable to load {}, caused by ", resource, e);
+ } finally {
+ resource.writeUnlock();
}
}
// delete upgrade folder when it is empty
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index a2e8357..bcf29d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.upgrade;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.tools.upgrade.TsFileOnlineUpgradeTool;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
@@ -59,19 +61,17 @@ public class UpgradeTask extends WrappedRunnable {
logger.info("find upgraded file for {}", upgradeResource.getTsFile());
upgradedResources = findUpgradedFiles();
}
- upgradeResource.writeLock();
- try {
- upgradeResource.setUpgradedResources(upgradedResources);
- upgradeResource.getUpgradeTsFileResourceCallBack().call(upgradeResource);
- } finally {
- upgradeResource.writeUnlock();
- }
- UpgradeSevice.setCntUpgradeFileNum(UpgradeSevice.getCntUpgradeFileNum() - 1);
+ upgradeResource.setUpgradedResources(upgradedResources);
+ upgradeResource.getUpgradeTsFileResourceCallBack().call(upgradeResource);
+ UpgradeSevice.getTotalUpgradeFileNum().getAndAdd(-1);
logger.info(
"Upgrade completes, file path:{} , the remaining upgraded file num: {}",
oldTsfilePath,
- UpgradeSevice.getCntUpgradeFileNum());
- if (UpgradeSevice.getCntUpgradeFileNum() == 0) {
+ UpgradeSevice.getTotalUpgradeFileNum().get());
+ if (UpgradeSevice.getTotalUpgradeFileNum().get() == 0) {
+ logger.info("Start delete empty tmp folders");
+ clearTmpFolders(DirectoryManager.getInstance().getAllSequenceFileFolders());
+ clearTmpFolders(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
UpgradeSevice.getINSTANCE().stop();
logger.info("All files upgraded successfully! ");
}
@@ -126,4 +126,44 @@ public class UpgradeTask extends WrappedRunnable {
}
return upgradedResources;
}
+
+ private void clearTmpFolders(List<String> folders) {
+ for (String baseDir : folders) {
+ File fileFolder = fsFactory.getFile(baseDir);
+ if (!fileFolder.isDirectory()) {
+ continue;
+ }
+ for (File storageGroup : fileFolder.listFiles()) {
+ if (!storageGroup.isDirectory()) {
+ continue;
+ }
+ File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
+ File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
+ if (upgradeDir == null) {
+ continue;
+ }
+ File[] tmpPartitionDirList = upgradeDir.listFiles();
+ if (tmpPartitionDirList == null) {
+ continue;
+ }
+ for (File tmpPartitionDir : tmpPartitionDirList) {
+ if (tmpPartitionDir.isDirectory()) {
+ try {
+ Files.delete(tmpPartitionDir.toPath());
+ } catch (IOException e) {
+ logger.error("Delete tmpPartitionDir {} failed", tmpPartitionDir);
+ }
+ }
+ }
+ // delete upgrade folder when it is empty
+ if (upgradeDir.isDirectory()) {
+ try {
+ Files.delete(upgradeDir.toPath());
+ } catch (IOException e) {
+ logger.error("Delete tmpUpgradeDir {} failed", upgradeDir);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java b/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
index 4468208..faa3c39 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
@@ -38,7 +38,7 @@ public class UpgradeSevice implements IService {
private ExecutorService upgradeThreadPool;
private AtomicInteger threadCnt = new AtomicInteger();
- private static int cntUpgradeFileNum;
+ private static AtomicInteger cntUpgradeFileNum = new AtomicInteger();
private UpgradeSevice() {}
@@ -63,7 +63,7 @@ public class UpgradeSevice implements IService {
updateThreadNum, r -> new Thread(r, "UpgradeThread-" + threadCnt.getAndIncrement()));
UpgradeLog.createUpgradeLog();
countUpgradeFiles();
- if (cntUpgradeFileNum == 0) {
+ if (cntUpgradeFileNum.get() == 0) {
stop();
return;
}
@@ -87,22 +87,8 @@ public class UpgradeSevice implements IService {
return ServiceType.UPGRADE_SERVICE;
}
- public static void setCntUpgradeFileNum(int cntUpgradeFileNum) {
- UpgradeUtils.getCntUpgradeFileLock().writeLock().lock();
- try {
- UpgradeSevice.cntUpgradeFileNum = cntUpgradeFileNum;
- } finally {
- UpgradeUtils.getCntUpgradeFileLock().writeLock().unlock();
- }
- }
-
- public static int getCntUpgradeFileNum() {
- UpgradeUtils.getCntUpgradeFileLock().readLock().lock();
- try {
- return cntUpgradeFileNum;
- } finally {
- UpgradeUtils.getCntUpgradeFileLock().readLock().unlock();
- }
+ public static AtomicInteger getTotalUpgradeFileNum() {
+ return cntUpgradeFileNum;
}
public void submitUpgradeTask(UpgradeTask upgradeTask) {
@@ -110,7 +96,7 @@ public class UpgradeSevice implements IService {
}
private static void countUpgradeFiles() {
- cntUpgradeFileNum = StorageEngine.getInstance().countUpgradeFiles();
+ cntUpgradeFileNum.addAndGet(StorageEngine.getInstance().countUpgradeFiles());
logger.info("finish counting upgrading files, total num:{}", cntUpgradeFileNum);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index b356dd3..c52457b 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -107,6 +108,23 @@ public class TsFileRewriteTool implements AutoCloseable {
}
}
+ public TsFileRewriteTool(TsFileResource resourceToBeRewritten, boolean needReaderForV2)
+ throws IOException {
+ oldTsFile = resourceToBeRewritten.getTsFile();
+ String file = oldTsFile.getAbsolutePath();
+ if (needReaderForV2) {
+ reader = new TsFileSequenceReaderForV2(file);
+ } else {
+ reader = new TsFileSequenceReader(file);
+ }
+ partitionWriterMap = new HashMap<>();
+ if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFile.FILE_SUFFIX).exists()) {
+ oldModification = (List<Modification>) resourceToBeRewritten.getModFile().getModifications();
+ modsIterator = oldModification.iterator();
+ fileModificationMap = new HashMap<>();
+ }
+ }
+
/**
* Rewrite an old file to the latest version
*
@@ -302,17 +320,12 @@ public class TsFileRewriteTool implements AutoCloseable {
List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
- boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1;
for (int j = 0; j < pageDataInChunk.size(); j++) {
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
} else {
writePageInToFile(
- schema,
- pageHeadersInChunk.get(j),
- pageDataInChunk.get(j),
- chunkWritersInChunkGroup,
- isOnlyOnePageChunk);
+ schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
}
}
}
@@ -372,15 +385,14 @@ public class TsFileRewriteTool implements AutoCloseable {
IMeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup,
- boolean isOnlyOnePageChunk)
+ Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
throws PageException {
long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
- chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader, isOnlyOnePageChunk);
+ chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
chunkWriters.put(schema, chunkWriter);
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 6d0192c..8b04bba 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -62,9 +62,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
* @throws IOException If some I/O error occurs
*/
public TsFileOnlineUpgradeTool(TsFileResource resourceToBeUpgraded) throws IOException {
- super(resourceToBeUpgraded);
- String file = oldTsFile.getAbsolutePath();
- reader = new TsFileSequenceReaderForV2(file);
+ super(resourceToBeUpgraded, true);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
index 87dd910..f8e2b82 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
@@ -122,16 +122,6 @@ public class UpgradeUtils {
upgradedResource.serialize();
// delete generated temp resource file
Files.delete(tempResourceFile.toPath());
- // delete tmp partition folder when it is empty
- File tmpPartitionDir = upgradedFile.getParentFile();
- if (tmpPartitionDir.isDirectory() && tmpPartitionDir.listFiles().length == 0) {
- Files.delete(tmpPartitionDir.toPath());
- }
- // delete upgrade folder when it is empty
- File upgradeDir = tmpPartitionDir.getParentFile();
- if (upgradeDir.isDirectory() && upgradeDir.listFiles().length == 0) {
- Files.delete(upgradeDir.toPath());
- }
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 078df52..e66ed69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -378,16 +378,32 @@ public class ChunkWriterImpl implements IChunkWriter {
* write the page header and data into the PageWriter's output stream. @NOTE: for upgrading
* 0.11/v2 to 0.12/v3 TsFile
*/
- public void writePageHeaderAndDataIntoBuff(
- ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws PageException {
-
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
// write the page header to pageBuffer
try {
logger.debug(
"start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
- ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
- ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
- if (!isOnlyOnePageChunk) {
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = header.getStatistics();
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
logger.debug(