You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 09:10:29 UTC
[iotdb] branch master updated: fix restart lossing data when sender shutdonws with data remain in memtable (#5740)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c410efa8d2 fix restart lossing data when sender shutdonws with data remain in memtable (#5740)
c410efa8d2 is described below
commit c410efa8d27a36f28019c7ffe066941135676a88
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 1 17:10:24 2022 +0800
fix restart lossing data when sender shutdonws with data remain in memtable (#5740)
---
.../db/integration/sync/IoTDBSyncSenderIT.java | 56 +++++++++++++++++++++-
.../iotdb/db/engine/storagegroup/DataRegion.java | 3 ++
.../db/engine/storagegroup/TsFileManager.java | 2 +-
.../db/sync/sender/manager/TsFileSyncManager.java | 12 -----
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 13 ++++-
.../db/sync/sender/recovery/TsFilePipeLogger.java | 5 ++
6 files changed, 76 insertions(+), 15 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index cacf6c6f57..b746793fdb 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -186,6 +186,19 @@ public class IoTDBSyncSenderIT {
Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData)); // del3 do not in history
}
+ private void prepareIns4() throws Exception { // ins unsealed tsfile
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(300, 300, 316.25, 'i')");
+ statement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(165, 165, 165.25, 'j')");
+ }
+
+ resultMap.put("ins4", Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData));
+ }
+
private void prepareDel1() throws Exception { // after ins1, add 2 deletions
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -239,7 +252,9 @@ public class IoTDBSyncSenderIT {
}
private void restart() throws Exception {
- EnvironmentUtils.restartDaemon();
+ // EnvironmentUtils.restartDaemon();
+ EnvironmentUtils.shutdownDaemon();
+ EnvironmentUtils.reactiveDaemon();
}
private void startPipe() throws Exception {
@@ -564,4 +579,43 @@ public class IoTDBSyncSenderIT {
}
}
}
+
+ @Test
+ public void testRestartWithUnsealedTsFile() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+ prepareIns2();
+ prepareDel1();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
+ stopPipe();
+ prepareDel2();
+ restart();
+ startPipe();
+ prepareIns3();
+ stopPipe();
+ prepareDel3();
+ prepareIns4();
+ startPipe();
+ restart();
+
+ Thread.sleep(1000L); // check
+ checkResult(
+ Arrays.asList("schema", "ins1", "ins2", "del2WithoutIns3", "ins3", "del3", "ins4"),
+ transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 40b2fa1659..1b2fbd1bfe 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -690,6 +690,9 @@ public class DataRegion {
TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
if (!recoverPerformer.canWrite()) {
// cannot write, just close it
+ if (tsFileSyncManager.isEnableSync()) {
+ tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+ }
try {
tsFileResource.close();
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index e363b6765f..009e151d87 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -397,7 +397,7 @@ public class TsFileManager {
isRealTimeTsFile = tsFileProcessor.isMemtableNotNull();
}
File tsFile = tsFileResource.getTsFile();
- if (!isRealTimeTsFile && !syncManager.isTsFileAlreadyBeCollected(tsFile)) {
+ if (!isRealTimeTsFile) {
File mods = new File(tsFileResource.getModFile().getFilePath());
long modsOffset = mods.exists() ? mods.length() : 0L;
File hardlink = syncManager.createHardlink(tsFile, modsOffset);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
index 3bb1bb6429..be298e8b59 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* TsFileSyncManager is designed for collect all history TsFiles(i.e. before the pipe start time,
@@ -43,7 +42,6 @@ public class TsFileSyncManager {
private static final Logger logger = LoggerFactory.getLogger(TsFileSyncManager.class);
private TsFilePipe syncPipe;
- private ConcurrentHashMap<File, File> tsFilesAlreadyCollected; // used as concurrent hash set
/** singleton */
private TsFileSyncManager() {}
@@ -82,9 +80,6 @@ public class TsFileSyncManager {
public void collectRealTimeTsFile(File tsFile) {
syncPipe.collectRealTimeTsFile(tsFile);
- if (tsFilesAlreadyCollected != null) {
- tsFilesAlreadyCollected.put(tsFile, tsFile);
- }
}
public void collectRealTimeResource(File tsFile) {
@@ -93,8 +88,6 @@ public class TsFileSyncManager {
public List<File> registerAndCollectHistoryTsFile(TsFilePipe syncPipe, long dataStartTime) {
registerSyncTask(syncPipe);
- tsFilesAlreadyCollected =
- new ConcurrentHashMap<>(); // should be locked while containing multi pipes
List<File> historyTsFiles = new ArrayList<>();
Iterator<Map.Entry<PartialPath, StorageGroupManager>> sgIterator =
@@ -104,14 +97,9 @@ public class TsFileSyncManager {
sgIterator.next().getValue().collectHistoryTsFileForSync(dataStartTime));
}
- tsFilesAlreadyCollected = null;
return historyTsFiles;
}
- public boolean isTsFileAlreadyBeCollected(File tsFile) {
- return tsFilesAlreadyCollected.containsKey(tsFile);
- }
-
public File createHardlink(File tsFile, long modsOffset) {
return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 3faefbfbdb..27cc16d712 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -174,13 +174,20 @@ public class TsFilePipe implements Pipe {
}
public File createHistoryTsFileHardlink(File tsFile, long modsOffset) {
+ collectRealTimeDataLock.lock(); // synchronize the pipeLog.isHardlinkExist
try {
+ if (pipeLog.isHardlinkExist(tsFile)) {
+ return null;
+ }
+
return pipeLog.createTsFileAndModsHardlink(tsFile, modsOffset);
} catch (IOException e) {
logger.error(
String.format("Create hardlink for history tsfile %s error.", tsFile.getPath()), e);
+ return null;
+ } finally {
+ collectRealTimeDataLock.unlock();
}
- return null;
}
public void collectRealTimeDeletion(Deletion deletion) {
@@ -212,6 +219,10 @@ public class TsFilePipe implements Pipe {
public void collectRealTimeTsFile(File tsFile) {
collectRealTimeDataLock.lock();
try {
+ if (pipeLog.isHardlinkExist(tsFile)) {
+ return;
+ }
+
maxSerialNumber += 1L;
File hardlink = pipeLog.createTsFileHardlink(tsFile);
PipeData tsFileData =
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
index 40ea57e5bc..6aa4e3309c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
@@ -49,6 +49,11 @@ public class TsFilePipeLogger {
tsFileDir = SyncPathUtil.getSenderFileDataDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
}
+ public boolean isHardlinkExist(File tsFile) {
+ File link = new File(tsFileDir, getRelativeFilePath(tsFile));
+ return link.exists();
+ }
+
/** make hard link for tsfile * */
public File createTsFileAndModsHardlink(File tsFile, long modsOffset) throws IOException {
File mods = new File(tsFile.getPath() + ModificationFile.FILE_SUFFIX);