You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/17 05:46:45 UTC

[incubator-iotdb] branch master updated: fix execute flush command while inserting bug (#916)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8681bb2  fix execute flush command while inserting bug (#916)
8681bb2 is described below

commit 8681bb2d53c094086e8586a05959f5f5f8612896
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Tue Mar 17 13:46:39 2020 +0800

    fix execute flush command while inserting bug (#916)
    
    * fix flush command bug
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  8 ++---
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 39 +++++++++++-----------
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 ++--
 .../storagegroup/StorageGroupProcessorTest.java    | 28 ++++++++--------
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 10 +++---
 6 files changed, 45 insertions(+), 48 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93df289..6bd9b14 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -303,9 +303,7 @@ public class StorageEngine implements IService {
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
     for (StorageGroupProcessor processor : processorMap.values()) {
-      processor.waitForAllCurrentTsFileProcessorsClosed();
-      //TODO do we need to wait for all merging tasks to be finished here?
-      processor.closeAllResources();
+      processor.syncCloseAllWorkingTsFileProcessors();
     }
   }
 
@@ -321,13 +319,13 @@ public class StorageEngine implements IService {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkSequenceTsFileProcessors())) {
-            processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
           }
         } else {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkUnsequenceTsFileProcessor())) {
-            processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
           }
         }
       } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 0b3b61b..7df2508 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -45,7 +45,7 @@ public interface TsFileFlushPolicy {
           tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
 
       if (tsFileProcessor.shouldClose()) {
-        storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor);
+        storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
       } else {
         tsFileProcessor.asyncFlush();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 72e05b1..674c0fd 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -94,7 +94,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
  * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
  * shouldClose())<br/>
  * <p>
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
+ * (2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
  * cli will call this method)<br/>
  * <p>
  * UnSequence data has the similar process as above.
@@ -102,7 +102,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
  * When a sequence TsFileProcessor is submitted to be flushed, the updateLatestFlushTimeCallback()
  * method will be called as a callback.<br/>
  * <p>
- * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessor() method will be called as a
+ * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be called as a
  * callback.
  */
 public class StorageGroupProcessor {
@@ -386,7 +386,7 @@ public class StorageGroupProcessor {
         // the last file is not closed, continue writing to in
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             schema, getVersionControllerByTimePartitionId(timePartitionId),
-            this::closeUnsealedTsFileProcessor,
+            this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
         workUnsequenceTsFileProcessors
             .put(timePartitionId, tsFileProcessor);
@@ -415,7 +415,7 @@ public class StorageGroupProcessor {
         // the last file is not closed, continue writing to in
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
             schema, getVersionControllerByTimePartitionId(timePartitionId),
-            this::closeUnsealedTsFileProcessor,
+            this::closeUnsealedTsFileProcessorCallBack,
             this::unsequenceFlushCallback, false, writer);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -745,7 +745,7 @@ public class StorageGroupProcessor {
               tsFileProcessorTreeMap.size(),
               IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2,
               storageGroupName);
-          moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue());
+          asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
         }
 
         // build new processor
@@ -787,12 +787,12 @@ public class StorageGroupProcessor {
     if (sequence) {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
           fsFactory.getFileWithParent(filePath),
-          schema, versionController, this::closeUnsealedTsFileProcessor,
+          schema, versionController, this::closeUnsealedTsFileProcessorCallBack,
           this::updateLatestFlushTimeCallback, true);
     } else {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
           fsFactory.getFileWithParent(filePath),
-          schema, versionController, this::closeUnsealedTsFileProcessor,
+          schema, versionController, this::closeUnsealedTsFileProcessorCallBack,
           this::unsequenceFlushCallback, false);
     }
 
@@ -820,8 +820,7 @@ public class StorageGroupProcessor {
   /**
    * thread-safety should be ensured by caller
    */
-  public void moveOneWorkProcessorToClosingList(boolean sequence,
-      TsFileProcessor tsFileProcessor) {
+  public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
     if (sequence) {
@@ -852,8 +851,8 @@ public class StorageGroupProcessor {
    */
   public void deleteFolder(String systemDir) {
     logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
-    waitForAllCurrentTsFileProcessorsClosed();
     writeLock();
+    syncCloseAllWorkingTsFileProcessors();
     try {
       File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
       if (storageGroupFolder.exists()) {
@@ -885,7 +884,8 @@ public class StorageGroupProcessor {
 
   public void syncDeleteDataFiles() {
     logger.info("{} will close all files for deleting data files", storageGroupName);
-    waitForAllCurrentTsFileProcessorsClosed();
+    writeLock();
+    syncCloseAllWorkingTsFileProcessors();
     //normally, mergingModification is just need to be closed by after a merge task is finished.
     //we close it here just for IT test.
     if (this.mergingModification != null) {
@@ -896,7 +896,6 @@ public class StorageGroupProcessor {
       }
 
     }
-    writeLock();
     try {
       closeAllResources();
       List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -995,17 +994,17 @@ public class StorageGroupProcessor {
   /**
    * This method will be blocked until all tsfile processors are closed.
    */
-  public void waitForAllCurrentTsFileProcessorsClosed() {
+  public void syncCloseAllWorkingTsFileProcessors() {
     synchronized (closeStorageGroupCondition) {
       try {
-        putAllWorkingTsFileProcessorIntoClosingList();
+        asyncCloseAllWorkingTsFileProcessors();
         long startTime = System.currentTimeMillis();
         while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
             .isEmpty()) {
           closeStorageGroupCondition.wait(60_000);
           if (System.currentTimeMillis() - startTime > 60_000) {
             logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
-                (System.currentTimeMillis() - startTime)/1000);
+                (System.currentTimeMillis() - startTime) / 1000);
           }
         }
       } catch (InterruptedException e) {
@@ -1015,19 +1014,19 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void putAllWorkingTsFileProcessorIntoClosingList() {
+  public void asyncCloseAllWorkingTsFileProcessors() {
     writeLock();
     try {
       logger.info("async force close all files in storage group: {}", storageGroupName);
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workSequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(true, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(true, tsFileProcessor);
       }
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workUnsequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(false, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(false, tsFileProcessor);
       }
     } finally {
       writeUnlock();
@@ -1309,7 +1308,7 @@ public class StorageGroupProcessor {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and insert method.
-  private void closeUnsealedTsFileProcessor(
+  private void closeUnsealedTsFileProcessorCallBack(
       TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
     closeQueryLock.writeLock().lock();
     try {
@@ -1370,7 +1369,7 @@ public class StorageGroupProcessor {
       }
       logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
           fullMerge);
-      waitForAllCurrentTsFileProcessorsClosed();
+      syncCloseAllWorkingTsFileProcessors();
       if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
         logger.info("{} no files to be merged", storageGroupName);
         return;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index ff05387..a46be2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest {
     for (int j = 11; j <= 20; j++) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+    storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, 0); // will be covered when read
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
     insertOneRecord(2, 100);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e86a985..0abeb56 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -89,7 +89,7 @@ public class StorageGroupProcessorTest {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
 
     for (int j = 1; j <= 10; j++) {
       record = new TSRecord(j, deviceId);
@@ -137,10 +137,10 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllWorkingTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
 
@@ -178,7 +178,7 @@ public class StorageGroupProcessorTest {
     batchInsertPlan1.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan1);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
+    processor.asyncCloseAllWorkingTsFileProcessors();
 
     BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements,
         dataTypes);
@@ -193,8 +193,8 @@ public class StorageGroupProcessorTest {
     batchInsertPlan2.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan2);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.asyncCloseAllWorkingTsFileProcessors();
+    processor.syncCloseAllWorkingTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
@@ -214,18 +214,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllWorkingTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllWorkingTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
         null, null);
@@ -247,18 +247,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllWorkingTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllWorkingTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllWorkingTsFileProcessors();
     processor.merge(true);
     while (mergeLock.get() == 0) {
       // wait
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index d28006b..24ad39f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -83,7 +83,7 @@ public class TTLTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
     EnvironmentUtils.cleanEnv();
   }
 
@@ -160,7 +160,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
       }
     }
     // unsequence data
@@ -168,7 +168,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
       }
     }
   }
@@ -225,7 +225,7 @@ public class TTLTest {
   public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
     prepareData();
 
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
     // files before ttl
     File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -335,7 +335,7 @@ public class TTLTest {
   @Test
   public void testTTLCleanFile() throws QueryProcessException {
     prepareData();
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
 
     assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
     assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());