You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/20 06:24:58 UTC
[incubator-iotdb] branch master updated: add vm level test and fix
bug (#1522)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0d4492c add vm level test and fix bug (#1522)
0d4492c is described below
commit 0d4492c066e7b806d9bd0ec59660f08f6687edce
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Mon Jul 20 14:24:42 2020 +0800
add vm level test and fix bug (#1522)
* fix vm recover bug
Co-authored-by: 张凌哲 <zh...@bytedance.com>
---
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 26 ++++++++++++++++++----
.../writelog/recover/TsFileRecoverPerformer.java | 3 ++-
.../storagegroup/TsFileProcessorEnableVmTest.java | 7 ++++++
4 files changed, 32 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6a30c5f..db890bc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -585,7 +585,7 @@ public class StorageGroupProcessor {
public static int getVmLevel(File file) {
String vmLevelStr = file.getPath()
.substring(file.getPath().lastIndexOf(TSFILE_SUFFIX)).replaceAll(TSFILE_SUFFIX, "")
- .split(IoTDBConstant.FILE_NAME_SEPARATOR)[0];
+ .split(IoTDBConstant.FILE_NAME_SEPARATOR)[1];
return Integer.parseInt(vmLevelStr);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9031941..c5b687c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,7 +605,7 @@ public class TsFileProcessor {
}
}
- public File createNewVMFile(TsFileResource tsFileResource, int level) {
+ public File createNewVMFileWithLock(TsFileResource tsFileResource, int level) {
vmFileCreateLock.writeLock().lock();
try {
TimeUnit.MILLISECONDS.sleep(1);
@@ -624,6 +624,22 @@ public class TsFileProcessor {
}
}
+ public static File createNewVMFile(TsFileResource tsFileResource, int level) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ File parent = tsFileResource.getTsFile().getParentFile();
+ return FSFactoryProducer.getFSFactory().getFile(parent,
+ tsFileResource.getTsFile().getName() + IoTDBConstant.FILE_NAME_SEPARATOR + level
+ + IoTDBConstant.FILE_NAME_SEPARATOR + System
+ .currentTimeMillis() + VM_SUFFIX);
+ } catch (InterruptedException e) {
+ logger.error("{}: {}, closing task is interrupted.",
+ tsFileResource.getTsFile().getParent(), tsFileResource.getTsFile().getName(), e);
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
private void deleteVmFiles(List<TsFileResource> vmMergeTsFiles,
List<RestorableTsFileIOWriter> vmMergeWriters) throws IOException {
logger.debug("{}: {} vm merge starts to delete file", storageGroupName,
@@ -693,7 +709,7 @@ public class TsFileProcessor {
}
int level = getVmLevel(sourceFileList.get(0));
if (isMergeFinished) {
- File newVmFile = createNewVMFile(tsFileResource, level + 1);
+ File newVmFile = createNewVMFileWithLock(tsFileResource, level + 1);
if (!targetFile.renameTo(newVmFile)) {
logger.error("Failed to rename {} to {}", targetFile, newVmFile);
} else {
@@ -760,7 +776,7 @@ public class TsFileProcessor {
if (config.isEnableVm()) {
logger.info("{}: {} [Flush] start to flush a memtable to a vm", storageGroupName,
tsFileResource.getTsFile().getName());
- File newVmFile = createNewVMFile(tsFileResource, 0);
+ File newVmFile = createNewVMFileWithLock(tsFileResource, 0);
if (vmWriters.isEmpty()) {
vmWriters.add(new ArrayList<>());
vmTsFileResources.add(new ArrayList<>());
@@ -1111,9 +1127,11 @@ public class TsFileProcessor {
List<List<TsFileResource>> currMergeVmFiles, VmLogger vmLogger) throws IOException {
VmMergeUtils.merge(writer, packVmWritersToSequenceList(currMergeVmWriters),
storageGroupName, vmLogger, new HashSet<>(), sequence);
+ vmMergeLock.writeLock().lock();
for (int i = 0; i < currMergeVmFiles.size(); i++) {
deleteVmFiles(currMergeVmFiles.get(i), currMergeVmWriters.get(i));
}
+ vmMergeLock.writeLock().unlock();
}
private List<RestorableTsFileIOWriter> packVmWritersToSequenceList(
@@ -1187,7 +1205,7 @@ public class TsFileProcessor {
for (RestorableTsFileIOWriter vmWriter : vmMergeWriters.get(i)) {
vmLogger.logFile(SOURCE_NAME, vmWriter.getFile());
}
- File newVmFile = createNewVMFile(tsFileResource, i + 1);
+ File newVmFile = createNewVMFileWithLock(tsFileResource, i + 1);
vmLogger.logFile(TARGET_NAME, newVmFile);
logger.info("{}: {} [Hot Compaction] merge level-{}'s {} vms to next level vm",
storageGroupName, tsFileResource.getTsFile().getName(), i,
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 1a146d6..d75de25 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.writelog.recover;
import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
import java.io.File;
@@ -176,7 +177,7 @@ public class TsFileRecoverPerformer {
if (tsFileNotCrashed) {
// if wal exists, we should open a new vmfile to replay it
- File newVmFile = resource.getProcessor().createNewVMFile(resource, 0);
+ File newVmFile = createNewVMFile(resource, 0);
TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
if (redoLogs(newVMWriter, newVmTsFileResource)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
index 1af2794..7b8044d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.storagegroup;
import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.getVmLevel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -127,6 +128,12 @@ public class TsFileProcessorEnableVmTest {
tsfileResourcesForQuery.clear();
processor.query(deviceId, measurementId, dataType, encoding, props, context,
tsfileResourcesForQuery);
+ List<List<TsFileResource>> tsfileResources = processor.getVmTsFileResources();
+ for (List<TsFileResource> levelResources : tsfileResources) {
+ for (TsFileResource resource : levelResources) {
+ assertEquals(0, getVmLevel(resource.getTsFile()));
+ }
+ }
assertEquals(1, tsfileResourcesForQuery.size());
assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size());