You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 09:10:29 UTC

[iotdb] branch master updated: fix restart lossing data when sender shutdonws with data remain in memtable (#5740)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c410efa8d2 fix restart lossing data when sender shutdonws with data remain in memtable (#5740)
c410efa8d2 is described below

commit c410efa8d27a36f28019c7ffe066941135676a88
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 1 17:10:24 2022 +0800

    fix restart lossing data when sender shutdonws with data remain in memtable (#5740)
---
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 56 +++++++++++++++++++++-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  3 ++
 .../db/engine/storagegroup/TsFileManager.java      |  2 +-
 .../db/sync/sender/manager/TsFileSyncManager.java  | 12 -----
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      | 13 ++++-
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |  5 ++
 6 files changed, 76 insertions(+), 15 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index cacf6c6f57..b746793fdb 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -186,6 +186,19 @@ public class IoTDBSyncSenderIT {
         Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData)); // del3 do not in history
   }
 
+  private void prepareIns4() throws Exception { // ins unsealed tsfile
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "insert into root.sg1.d1(timestamp, s1, s2, s3) values(300, 300, 316.25, 'i')");
+      statement.execute(
+          "insert into root.sg1.d1(timestamp, s1, s2, s3) values(165, 165, 165.25, 'j')");
+    }
+
+    resultMap.put("ins4", Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData));
+  }
+
   private void prepareDel1() throws Exception { // after ins1, add 2 deletions
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -239,7 +252,9 @@ public class IoTDBSyncSenderIT {
   }
 
   private void restart() throws Exception {
-    EnvironmentUtils.restartDaemon();
+    //    EnvironmentUtils.restartDaemon();
+    EnvironmentUtils.shutdownDaemon();
+    EnvironmentUtils.reactiveDaemon();
   }
 
   private void startPipe() throws Exception {
@@ -564,4 +579,43 @@ public class IoTDBSyncSenderIT {
       }
     }
   }
+
+  @Test
+  public void testRestartWithUnsealedTsFile() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+      prepareIns2();
+      prepareDel1();
+
+      preparePipeAndSetMock(); // realtime
+      startPipe();
+      stopPipe();
+      prepareDel2();
+      restart();
+      startPipe();
+      prepareIns3();
+      stopPipe();
+      prepareDel3();
+      prepareIns4();
+      startPipe();
+      restart();
+
+      Thread.sleep(1000L); // check
+      checkResult(
+          Arrays.asList("schema", "ins1", "ins2", "del2WithoutIns3", "ins3", "del3", "ins4"),
+          transportClient.getPipeDataList());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      try {
+        dropPipe();
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 40b2fa1659..1b2fbd1bfe 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -690,6 +690,9 @@ public class DataRegion {
     TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
     if (!recoverPerformer.canWrite()) {
       // cannot write, just close it
+      if (tsFileSyncManager.isEnableSync()) {
+        tsFileSyncManager.collectRealTimeTsFile(tsFileResource.getTsFile());
+      }
       try {
         tsFileResource.close();
       } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index e363b6765f..009e151d87 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -397,7 +397,7 @@ public class TsFileManager {
         isRealTimeTsFile = tsFileProcessor.isMemtableNotNull();
       }
       File tsFile = tsFileResource.getTsFile();
-      if (!isRealTimeTsFile && !syncManager.isTsFileAlreadyBeCollected(tsFile)) {
+      if (!isRealTimeTsFile) {
         File mods = new File(tsFileResource.getModFile().getFilePath());
         long modsOffset = mods.exists() ? mods.length() : 0L;
         File hardlink = syncManager.createHardlink(tsFile, modsOffset);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
index 3bb1bb6429..be298e8b59 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * TsFileSyncManager is designed for collect all history TsFiles(i.e. before the pipe start time,
@@ -43,7 +42,6 @@ public class TsFileSyncManager {
   private static final Logger logger = LoggerFactory.getLogger(TsFileSyncManager.class);
 
   private TsFilePipe syncPipe;
-  private ConcurrentHashMap<File, File> tsFilesAlreadyCollected; // used as concurrent hash set
 
   /** singleton */
   private TsFileSyncManager() {}
@@ -82,9 +80,6 @@ public class TsFileSyncManager {
 
   public void collectRealTimeTsFile(File tsFile) {
     syncPipe.collectRealTimeTsFile(tsFile);
-    if (tsFilesAlreadyCollected != null) {
-      tsFilesAlreadyCollected.put(tsFile, tsFile);
-    }
   }
 
   public void collectRealTimeResource(File tsFile) {
@@ -93,8 +88,6 @@ public class TsFileSyncManager {
 
   public List<File> registerAndCollectHistoryTsFile(TsFilePipe syncPipe, long dataStartTime) {
     registerSyncTask(syncPipe);
-    tsFilesAlreadyCollected =
-        new ConcurrentHashMap<>(); // should be locked while containing multi pipes
 
     List<File> historyTsFiles = new ArrayList<>();
     Iterator<Map.Entry<PartialPath, StorageGroupManager>> sgIterator =
@@ -104,14 +97,9 @@ public class TsFileSyncManager {
           sgIterator.next().getValue().collectHistoryTsFileForSync(dataStartTime));
     }
 
-    tsFilesAlreadyCollected = null;
     return historyTsFiles;
   }
 
-  public boolean isTsFileAlreadyBeCollected(File tsFile) {
-    return tsFilesAlreadyCollected.containsKey(tsFile);
-  }
-
   public File createHardlink(File tsFile, long modsOffset) {
     return syncPipe.createHistoryTsFileHardlink(tsFile, modsOffset);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 3faefbfbdb..27cc16d712 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -174,13 +174,20 @@ public class TsFilePipe implements Pipe {
   }
 
   public File createHistoryTsFileHardlink(File tsFile, long modsOffset) {
+    collectRealTimeDataLock.lock(); // synchronize the pipeLog.isHardlinkExist
     try {
+      if (pipeLog.isHardlinkExist(tsFile)) {
+        return null;
+      }
+
       return pipeLog.createTsFileAndModsHardlink(tsFile, modsOffset);
     } catch (IOException e) {
       logger.error(
           String.format("Create hardlink for history tsfile %s error.", tsFile.getPath()), e);
+      return null;
+    } finally {
+      collectRealTimeDataLock.unlock();
     }
-    return null;
   }
 
   public void collectRealTimeDeletion(Deletion deletion) {
@@ -212,6 +219,10 @@ public class TsFilePipe implements Pipe {
   public void collectRealTimeTsFile(File tsFile) {
     collectRealTimeDataLock.lock();
     try {
+      if (pipeLog.isHardlinkExist(tsFile)) {
+        return;
+      }
+
       maxSerialNumber += 1L;
       File hardlink = pipeLog.createTsFileHardlink(tsFile);
       PipeData tsFileData =
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
index 40ea57e5bc..6aa4e3309c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
@@ -49,6 +49,11 @@ public class TsFilePipeLogger {
     tsFileDir = SyncPathUtil.getSenderFileDataDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
   }
 
+  public boolean isHardlinkExist(File tsFile) {
+    File link = new File(tsFileDir, getRelativeFilePath(tsFile));
+    return link.exists();
+  }
+
   /** make hard link for tsfile * */
   public File createTsFileAndModsHardlink(File tsFile, long modsOffset) throws IOException {
     File mods = new File(tsFile.getPath() + ModificationFile.FILE_SUFFIX);