You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/17 02:39:45 UTC

[incubator-iotdb] 01/01: fix flush

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_flush_close_file_error
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9302734bfdc2bf0092abaf3f6a5bb406864a4f62
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Mar 17 10:39:25 2020 +0800

    fix flush
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  8 ++--
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 54 ++++++++++++----------
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 +--
 .../storagegroup/StorageGroupProcessorTest.java    | 28 +++++------
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 10 ++--
 6 files changed, 55 insertions(+), 53 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93df289..6b742d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -303,9 +303,7 @@ public class StorageEngine implements IService {
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
     for (StorageGroupProcessor processor : processorMap.values()) {
-      processor.waitForAllCurrentTsFileProcessorsClosed();
-      //TODO do we need to wait for all merging tasks to be finished here?
-      processor.closeAllResources();
+      processor.syncCloseAllTsFileProcessors();
     }
   }
 
@@ -321,13 +319,13 @@ public class StorageEngine implements IService {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkSequenceTsFileProcessors())) {
-            processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
           }
         } else {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkUnsequenceTsFileProcessor())) {
-            processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
           }
         }
       } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 0b3b61b..7df2508 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -45,7 +45,7 @@ public interface TsFileFlushPolicy {
           tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
 
       if (tsFileProcessor.shouldClose()) {
-        storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor);
+        storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
       } else {
         tsFileProcessor.asyncFlush();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 72e05b1..03ac594 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -94,7 +94,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
  * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
  * shouldClose())<br/>
  * <p>
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
+ * (2) someone calls syncCloseAllTsFileProcessors(). (up to now, only flush command from
  * cli will call this method)<br/>
  * <p>
  * UnSequence data has the similar process as above.
@@ -745,7 +745,7 @@ public class StorageGroupProcessor {
               tsFileProcessorTreeMap.size(),
               IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2,
               storageGroupName);
-          moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue());
+          asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
         }
 
         // build new processor
@@ -820,8 +820,7 @@ public class StorageGroupProcessor {
   /**
    * thread-safety should be ensured by caller
    */
-  public void moveOneWorkProcessorToClosingList(boolean sequence,
-      TsFileProcessor tsFileProcessor) {
+  public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
     if (sequence) {
@@ -852,8 +851,8 @@ public class StorageGroupProcessor {
    */
   public void deleteFolder(String systemDir) {
     logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
-    waitForAllCurrentTsFileProcessorsClosed();
     writeLock();
+    syncCloseAllTsFileProcessors();
     try {
       File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
       if (storageGroupFolder.exists()) {
@@ -885,7 +884,8 @@ public class StorageGroupProcessor {
 
   public void syncDeleteDataFiles() {
     logger.info("{} will close all files for deleting data files", storageGroupName);
-    waitForAllCurrentTsFileProcessorsClosed();
+    writeLock();
+    syncCloseAllTsFileProcessors();
     //normally, mergingModification is just need to be closed by after a merge task is finished.
     //we close it here just for IT test.
     if (this.mergingModification != null) {
@@ -896,7 +896,6 @@ public class StorageGroupProcessor {
       }
 
     }
-    writeLock();
     try {
       closeAllResources();
       List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -995,39 +994,44 @@ public class StorageGroupProcessor {
   /**
    * This method will be blocked until all tsfile processors are closed.
    */
-  public void waitForAllCurrentTsFileProcessorsClosed() {
-    synchronized (closeStorageGroupCondition) {
-      try {
-        putAllWorkingTsFileProcessorIntoClosingList();
-        long startTime = System.currentTimeMillis();
-        while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
-            .isEmpty()) {
-          closeStorageGroupCondition.wait(60_000);
-          if (System.currentTimeMillis() - startTime > 60_000) {
-            logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
-                (System.currentTimeMillis() - startTime)/1000);
+  public void syncCloseAllTsFileProcessors() {
+    writeLock();
+    try {
+      synchronized (closeStorageGroupCondition) {
+        try {
+          asyncCloseAllTsFileProcessors();
+          long startTime = System.currentTimeMillis();
+          while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
+              .isEmpty()) {
+            closeStorageGroupCondition.wait(60_000);
+            if (System.currentTimeMillis() - startTime > 60_000) {
+              logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
+                  (System.currentTimeMillis() - startTime)/1000);
+            }
           }
+        } catch (InterruptedException e) {
+          logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
+              + "group {}", storageGroupName, e);
         }
-      } catch (InterruptedException e) {
-        logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
-            + "group {}", storageGroupName, e);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
-  public void putAllWorkingTsFileProcessorIntoClosingList() {
+  public void asyncCloseAllTsFileProcessors() {
     writeLock();
     try {
       logger.info("async force close all files in storage group: {}", storageGroupName);
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workSequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(true, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(true, tsFileProcessor);
       }
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workUnsequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(false, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(false, tsFileProcessor);
       }
     } finally {
       writeUnlock();
@@ -1370,7 +1374,7 @@ public class StorageGroupProcessor {
       }
       logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
           fullMerge);
-      waitForAllCurrentTsFileProcessorsClosed();
+      syncCloseAllTsFileProcessors();
       if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
         logger.info("{} no files to be merged", storageGroupName);
         return;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index ff05387..2f51e18 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest {
     for (int j = 11; j <= 20; j++) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+    storageGroupProcessor.asyncCloseAllTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, 0); // will be covered when read
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     insertOneRecord(2, 100);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e86a985..ec645b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -89,7 +89,7 @@ public class StorageGroupProcessorTest {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 1; j <= 10; j++) {
       record = new TSRecord(j, deviceId);
@@ -137,10 +137,10 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
 
@@ -178,7 +178,7 @@ public class StorageGroupProcessorTest {
     batchInsertPlan1.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan1);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
+    processor.asyncCloseAllTsFileProcessors();
 
     BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements,
         dataTypes);
@@ -193,8 +193,8 @@ public class StorageGroupProcessorTest {
     batchInsertPlan2.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan2);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.asyncCloseAllTsFileProcessors();
+    processor.syncCloseAllTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
@@ -214,18 +214,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
@@ -247,18 +247,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
     processor.merge(true);
     while (mergeLock.get() == 0) {
       // wait
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index d28006b..55eb0a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -83,7 +83,7 @@ public class TTLTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
     EnvironmentUtils.cleanEnv();
   }
 
@@ -160,7 +160,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllTsFileProcessors();
       }
     }
     // unsequence data
@@ -168,7 +168,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllTsFileProcessors();
       }
     }
   }
@@ -225,7 +225,7 @@ public class TTLTest {
   public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
     prepareData();
 
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     // files before ttl
     File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -335,7 +335,7 @@ public class TTLTest {
   @Test
   public void testTTLCleanFile() throws QueryProcessException {
     prepareData();
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
     assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());