You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/15 07:02:01 UTC

[iotdb] branch master updated: [IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e46df71015 [IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)
e46df71015 is described below

commit e46df7101597f6f295a72480260efec8a1bc202f
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Tue Nov 15 15:01:56 2022 +0800

    [IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 31 ++++++++-------
 .../engine/storagegroup/HashLastFlushTimeMap.java  |  3 +-
 .../storagegroup/IDTableLastFlushTimeMap.java      |  3 +-
 .../db/engine/storagegroup/ILastFlushTimeMap.java  |  2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    | 46 ++++++++++++++++++----
 .../db/engine/storagegroup/DataRegionTest.java     | 23 ++++++++++-
 .../engine/storagegroup/TsFileProcessorV2Test.java | 12 +++---
 7 files changed, 85 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 3bcf0a916e..e0864da1cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1379,6 +1379,8 @@ public class DataRegion {
     logger.info(
         "Async close tsfile: {}",
         tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
+
+    boolean isEmptyFile = tsFileProcessor.isEmpty();
     if (sequence) {
       closingSequenceTsFileProcessor.add(tsFileProcessor);
       tsFileProcessor.asyncClose();
@@ -1401,6 +1403,16 @@ public class DataRegion {
         timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
     }
+    if (isEmptyFile) {
+      try {
+        fsFactory.deleteIfExists(tsFileProcessor.getTsFileResource().getTsFile());
+        tsFileManager.remove(tsFileProcessor.getTsFileResource(), sequence);
+      } catch (IOException e) {
+        logger.error(
+            "Remove empty file {} error",
+            tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
+      }
+    }
   }
 
   /**
@@ -2059,7 +2071,7 @@ public class DataRegion {
     }
   }
 
-  private boolean unsequenceFlushCallback(
+  private void unsequenceFlushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
     TimePartitionManager.getInstance()
         .updateAfterFlushing(
@@ -2068,21 +2080,11 @@ public class DataRegion {
             systemFlushTime,
             lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
             workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
-    return true;
   }
 
-  private boolean sequenceFlushCallback(
+  private void sequenceFlushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
-    boolean res = lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
-    if (!res) {
-      logger.warn(
-          "Partition: {} does't have latest time for each device. "
-              + "No valid record is written into memtable. Flushing tsfile is: {}",
-          processor.getTimeRangeId(),
-          processor.getTsFileResource().getTsFile());
-      return res;
-    }
-
+    lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
     TimePartitionManager.getInstance()
         .updateAfterFlushing(
             new DataRegionId(Integer.valueOf(dataRegionId)),
@@ -2090,7 +2092,6 @@ public class DataRegion {
             systemFlushTime,
             lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
             workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
-    return res;
   }
 
   /** used for upgrading */
@@ -3276,7 +3277,7 @@ public class DataRegion {
   @FunctionalInterface
   public interface UpdateEndTimeCallBack {
 
-    boolean call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
+    void call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
   }
 
   @FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index b110cd6768..020077bb6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -181,7 +181,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
   }
 
   @Override
-  public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+  public void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
     for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(partitionId, id -> new HashMap<>())
@@ -193,7 +193,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
       }
     }
-    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 8e690290cd..6ad437fdc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -113,7 +113,7 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
   }
 
   @Override
-  public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+  public void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
     for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
       DeviceEntry deviceEntry = idTable.getDeviceEntry(entry.getKey());
       deviceEntry.updateFlushTimeMap(partitionId, entry.getValue());
@@ -121,7 +121,6 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
         deviceEntry.setGlobalFlushTime(entry.getValue());
       }
     }
-    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index a98dd06e6f..f344b73f31 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -52,7 +52,7 @@ public interface ILastFlushTimeMap {
   // region support upgrade methods
   void applyNewlyFlushedTimeToFlushedTime();
 
-  boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
+  void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
   // endregion
 
   // region query
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 853484d51b..e59c593604 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -798,7 +798,6 @@ public class TsFileProcessor {
           FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
     }
     try {
-
       if (logger.isInfoEnabled()) {
         if (workMemTable != null) {
           logger.info(
@@ -833,7 +832,7 @@ public class TsFileProcessor {
       // we have to add the memtable into flushingList first and then set the shouldClose tag.
       // see https://issues.apache.org/jira/browse/IOTDB-510
       IMemTable tmpMemTable =
-          workMemTable == null || workMemTable.memSize() == 0
+          workMemTable == null || workMemTable.getTotalPointsNum() == 0
               ? new NotifyFlushMemTable()
               : workMemTable;
 
@@ -845,8 +844,12 @@ public class TsFileProcessor {
         }
         // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
         // flushing memTable in System module.
-        addAMemtableIntoFlushingList(tmpMemTable);
-        logger.info("Memtable {} has been added to flushing list", tmpMemTable);
+        if (totalMemTableSize == 0 && tmpMemTable.getTotalPointsNum() == 0) {
+          endEmptyFile();
+        } else {
+          addAMemtableIntoFlushingList(tmpMemTable);
+          logger.info("Memtable {} has been added to flushing list", tmpMemTable);
+        }
         shouldClose = true;
       } catch (Exception e) {
         logger.error(
@@ -956,10 +959,10 @@ public class TsFileProcessor {
       lastTimeForEachDevice = tobeFlushed.getMaxTime();
       tsFileResource.updateEndTime(lastTimeForEachDevice);
     }
-    if (!tobeFlushed.isSignalMemTable()
-        && (tobeFlushed.memSize() == 0
-            || (!updateLatestFlushTimeCallback.call(
-                this, lastTimeForEachDevice, System.currentTimeMillis())))) {
+
+    updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice, System.currentTimeMillis());
+
+    if (!tobeFlushed.isSignalMemTable() && tobeFlushed.getTotalPointsNum() == 0) {
       logger.warn(
           "This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
           storageGroupName,
@@ -1303,6 +1306,28 @@ public class TsFileProcessor {
     writer = null;
   }
 
+  /** end empty file and remove it from file system */
+  private void endEmptyFile() throws TsFileProcessorException, IOException {
+    logger.info("Start to end empty file {}", tsFileResource);
+
+    // remove this processor from Closing list in DataRegion,
+    // mark the TsFileResource closed, no need writer anymore
+    writer.close();
+    for (CloseFileListener closeFileListener : closeFileListeners) {
+      closeFileListener.onClosed(this);
+    }
+    if (enableMemControl) {
+      tsFileProcessorInfo.clear();
+      dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
+    }
+    logger.info(
+        "Storage group {} close and remove empty file {}",
+        storageGroupName,
+        tsFileResource.getTsFile().getAbsoluteFile());
+
+    writer = null;
+  }
+
   public boolean isManagedByFlushManager() {
     return managedByFlushManager;
   }
@@ -1496,6 +1521,11 @@ public class TsFileProcessor {
     }
   }
 
+  public boolean isEmpty() {
+    return totalMemTableSize == 0
+        && (workMemTable == null || workMemTable.getTotalPointsNum() == 0);
+  }
+
   @TestOnly
   public IMemTable getWorkMemTable() {
     return workMemTable;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 8a5888601e..58f7311635 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -1087,7 +1087,7 @@ public class DataRegionTest {
     // delete data which is in flushing memtable
     dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
     dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
-    dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+    dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0, null);
 
     dataRegion.syncCloseAllWorkingTsFileProcessors();
     Assert.assertTrue(tsFileResource.getModFile().exists());
@@ -1150,6 +1150,27 @@ public class DataRegionTest {
     Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
   }
 
+  @Test
+  public void testFlushingEmptyMemtable()
+      throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
+
+    // delete all data which is in flushing memtable
+    dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0, null);
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    Assert.assertFalse(tsFileResource.getTsFile().exists());
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+    Assert.assertFalse(dataRegion.getTsFileManager().contains(tsFileResource, true));
+    Assert.assertFalse(
+        dataRegion.getWorkSequenceTsFileProcessors().contains(tsFileResource.getProcessor()));
+  }
+
   static class DummyDataRegion extends DataRegion {
 
     DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index ca944ce80c..9840b66da7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -110,7 +110,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -169,7 +169,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -256,7 +256,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -299,7 +299,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -335,7 +335,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -371,7 +371,7 @@ public class TsFileProcessorV2Test {
             SystemFileFactory.INSTANCE.getFile(filePath),
             sgInfo,
             this::closeTsFileProcessor,
-            (tsFileProcessor, updateMap, systemFlushTime) -> true,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
             true);
 
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);