You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 06:02:55 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add filenode processor test

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new adb09a5  add filenode processor test
adb09a5 is described below

commit adb09a5477b1d32a1d5a829849de3d8367a02706
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 14:02:44 2019 +0800

    add filenode processor test
---
 .../db/engine/filenode/CopyOnReadLinkedList.java   |  5 ++
 .../db/engine/filenodeV2/FileNodeManagerV2.java    |  2 +-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 76 ++++++++++++++++------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 17 +++--
 .../iotdb/db/engine/memtable/MemTablePool.java     |  9 +++
 .../db/engine/filenode/FileNodeProcessorTest.java  |  2 +-
 .../engine/filenodeV2/FileNodeProcessorV2Test.java | 38 +++++++++--
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  | 12 ++--
 .../engine/memtable/MemTableFlushTaskV2Test.java   |  2 +-
 .../iotdb/db/sync/sender/SingleClientSyncTest.java |  2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    | 38 +++++++++--
 11 files changed, 158 insertions(+), 45 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
index 3353a6c..e6c8249 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
@@ -38,6 +38,10 @@ public class CopyOnReadLinkedList<T> {
     data.add(d);
   }
 
+  public synchronized boolean contains(T d) {
+    return data.contains(d);
+  }
+
   public synchronized void remove(T d) {
     data.remove(d);
   }
@@ -65,4 +69,5 @@ public class CopyOnReadLinkedList<T> {
   public int size() {
     return data.size();
   }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index e0e6830..7af1dbd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -429,7 +429,7 @@ public class FileNodeManagerV2 implements IStatistic, IService {
   private void deleteFileNodeBlocked(String processorName) throws IOException {
     LOGGER.info("Forced to delete the filenode processor {}", processorName);
     FileNodeProcessorV2 processor = processorMap.get(processorName);
-    processor.syncCloseFileNode(() -> {
+    processor.syncCloseAndReleaseFileNode(() -> {
       String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
       fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
       try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index b86d9fb..7782b55 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -167,7 +166,7 @@ public class FileNodeProcessorV2 {
       latestFlushedTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 
       boolean result;
-      // write to sequence or unsequence file
+      // write to sequence or unSequence file
       if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
         result = writeUnsealedDataFile(tsRecord, true);
         insertResult = result ? 1 : -1;
@@ -194,12 +193,11 @@ public class FileNodeProcessorV2 {
       if (sequence) {
         if (workSequenceTsFileProcessor == null) {
           String baseDir = directories.getNextFolderForTsfile();
-          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "-" + versionController.nextVersion())
-              .toString();
-          System.out.println(filePath);
-          workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
-              new File(filePath),
-              fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "-" + versionController.nextVersion()).toString();
+
+          workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
+              fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
+
           sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
         }
         unsealedTsFileProcessor = workSequenceTsFileProcessor;
@@ -207,11 +205,12 @@ public class FileNodeProcessorV2 {
         if (workUnSequenceTsFileProcessor == null) {
           // TODO check if the disk is full
           String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
-          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "" + + versionController.nextVersion())
-              .toString();
-          workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
-              new File(filePath),
-              fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
+          new File(baseDir).mkdirs();
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "-" + +versionController.nextVersion()).toString();
+
+          workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
+              fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
+
           unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
         }
         unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
@@ -246,7 +245,6 @@ public class FileNodeProcessorV2 {
           deviceId, measurementId);
       List<TsFileResourceV2> unsequnceResources = getFileReSourceListForQuery(unSequenceFileList,
           deviceId, measurementId);
-
       return new QueryDataSourceV2(
           new GlobalSortedSeriesDataSourceV2(new Path(deviceId, measurementId), sequnceResources),
           new GlobalSortedSeriesDataSourceV2(new Path(deviceId, measurementId),
@@ -308,14 +306,22 @@ public class FileNodeProcessorV2 {
     }
 
     unsealedTsFileProcessor.asyncFlush();
+  }
+
 
-    // update the largest timestamp in the last flushing memtable
-    for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
-      latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+  public boolean updateLatestFlushTimeCallback() {
+    lock.writeLock().lock();
+    try {
+      // update the largest timestamp in the last flushing memtable
+      for (Entry<String, Long> entry : latestTimeForEachDevice.entrySet()) {
+        latestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
+    return true;
   }
 
-
   /**
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
@@ -323,7 +329,11 @@ public class FileNodeProcessorV2 {
   public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
     lock.writeLock().lock();
     try {
-      closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+      if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
+        closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+      } else {
+        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+      }
       // end time with one start time
       TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
       synchronized (resource) {
@@ -358,9 +368,32 @@ public class FileNodeProcessorV2 {
   }
 
   /**
-   * Block this method until this file node can be closed.
+   * This method will be blocked until all tsfile processors are closed.
    */
-  public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
+  public void syncCloseFileNode(){
+    lock.writeLock().lock();
+    try {
+      asyncForceClose();
+      while (true) {
+        if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()
+            && workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
+          break;
+        }
+        closeFileNodeCondition.await();
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error("CloseFileNodeConditon occurs error while waiting for closing the file node {}",
+          storageGroupName, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+
+  /**
+   * This method will be blocked until this file node can be closed.
+   */
+  public void syncCloseAndReleaseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
     lock.writeLock().lock();
     try {
       asyncForceClose();
@@ -381,6 +414,7 @@ public class FileNodeProcessorV2 {
     }
   }
 
+
   public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 3185a2e..ed87a71 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.EmptyMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -73,6 +74,8 @@ public class UnsealedTsFileProcessorV2 {
 
   private Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback;
 
+  private Supplier flushUpdateLatestFlushTimeCallback;
+
   /**
    * sync this object in query() and asyncFlush()
    */
@@ -80,7 +83,8 @@ public class UnsealedTsFileProcessorV2 {
 
   public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
       VersionController versionController,
-      Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback)
+      Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback,
+      Supplier flushUpdateLatestFlushTimeCallback)
       throws IOException {
     this.storageGroupName = storageGroupName;
     this.fileSchema = fileSchema;
@@ -88,6 +92,7 @@ public class UnsealedTsFileProcessorV2 {
     this.versionController = versionController;
     this.writer = new NativeRestorableIOWriter(tsfile);
     this.closeUnsealedFileCallback = closeUnsealedFileCallback;
+    this.flushUpdateLatestFlushTimeCallback = flushUpdateLatestFlushTimeCallback;
   }
 
   /**
@@ -133,8 +138,7 @@ public class UnsealedTsFileProcessorV2 {
     try {
       writer.makeMetadataVisible();
       flushingMemTables.remove(memTable);
-      MemTablePool.getInstance().putBack(memTable);
-      LOGGER.info("Processor {} return back a memtable to MemTablePool", storageGroupName);
+      MemTablePool.getInstance().putBack(memTable, storageGroupName);
     } finally {
       flushQueryLock.writeLock().unlock();
     }
@@ -156,6 +160,7 @@ public class UnsealedTsFileProcessorV2 {
       }
       flushingMemTables.addLast(workMemTable);
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
+      flushUpdateLatestFlushTimeCallback.get();
       workMemTable = null;
     } finally {
       flushQueryLock.writeLock().unlock();
@@ -170,6 +175,7 @@ public class UnsealedTsFileProcessorV2 {
       tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable;
       flushingMemTables.addLast(tmpMemTable);
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
+      flushUpdateLatestFlushTimeCallback.get();
       workMemTable = null;
     } finally {
       flushQueryLock.writeLock().unlock();
@@ -197,9 +203,10 @@ public class UnsealedTsFileProcessorV2 {
 
   public synchronized void asyncClose() {
     flushingMemTables.add(workMemTable == null ? new EmptyMemTable() : workMemTable);
-    workMemTable = null;
-    shouldClose = true;
     FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
+    flushUpdateLatestFlushTimeCallback.get();
+    shouldClose = true;
+    workMemTable = null;
   }
 
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 349f790..349d3fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -86,6 +86,15 @@ public class MemTablePool {
     }
   }
 
+  public void putBack(IMemTable memTable, String storageGroup) {
+    synchronized (emptyMemTables) {
+      memTable.clear();
+      emptyMemTables.push(memTable);
+      emptyMemTables.notify();
+      LOGGER.info("{} return a memtable, stack size {}", storageGroup, emptyMemTables.size());
+    }
+  }
+
   public static MemTablePool getInstance() {
     return InstanceHolder.INSTANCE;
   }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java
index 44d2bbe..e135d42 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java
@@ -62,7 +62,7 @@ public class FileNodeProcessorTest {
   private String processName = "root.vehicle";
 
   @Before
-  public void setUp() throws FileNodeProcessorException, StartupException {
+  public void setUp() throws FileNodeProcessorException, StartupException, IOException {
     // init metadata
     EnvironmentUtils.envSetUp();
     MetadataManagerHelper.initMetadata();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 103e803..e14ebbe 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 public class FileNodeProcessorV2Test {
 
   private String storageGroup = "storage_group1";
-  private String baseDir = "data";
   private String systemDir = "data/info";
   private String deviceId = "root.vehicle.d0";
   private String measurementId = "s0";
@@ -48,12 +47,12 @@ public class FileNodeProcessorV2Test {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
-    EnvironmentUtils.cleanDir(baseDir);
+    EnvironmentUtils.cleanDir(systemDir);
   }
 
 
   @Test
-  public void testAsyncClose() {
+  public void testSequenceSyncClose() {
     for (int j = 1; j <= 100; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -61,7 +60,7 @@ public class FileNodeProcessorV2Test {
       processor.asyncForceClose();
     }
 
-    processor.syncCloseFileNode(() -> null);
+    processor.syncCloseFileNode();
     QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
     Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 100);
     for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
@@ -70,4 +69,35 @@ public class FileNodeProcessorV2Test {
   }
 
 
+  @Test
+  public void testSeqAndUnSeqSyncClose() {
+
+    for (int j = 21; j <= 30; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(record);
+      processor.asyncForceClose();
+    }
+    processor.syncCloseFileNode();
+
+    for (int j = 10; j >= 1; j--) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+      processor.insert(record);
+      processor.asyncForceClose();
+    }
+
+    processor.syncCloseFileNode();
+
+    QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
+    Assert.assertEquals(10, queryDataSource.getSeqDataSource().getQueryTsFiles().size());
+    Assert.assertEquals(10, queryDataSource.getUnSequenceDataSource().getQueryTsFiles().size());
+    for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+    for (TsFileResourceV2 resource : queryDataSource.getUnSequenceDataSource().getQueryTsFiles()) {
+      Assert.assertTrue(resource.isClosed());
+    }
+  }
+
 }
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index a51df9e..c09f69e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -70,7 +70,7 @@ public class UnsealedTsFileProcessorV2Test {
   @Test
   public void testWriteAndFlush() throws WriteProcessException, IOException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
-        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{});
+        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()-> true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
         .query(deviceId, measurementId, dataType, props);
@@ -114,7 +114,7 @@ public class UnsealedTsFileProcessorV2Test {
   @Test
   public void testMultiFlush() throws WriteProcessException, IOException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
-        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{});
+        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()->true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
         .query(deviceId, measurementId, dataType, props);
@@ -123,8 +123,8 @@ public class UnsealedTsFileProcessorV2Test {
     assertTrue(left.isEmpty());
     assertEquals(0, right.size());
 
-    for (int flushId = 0; flushId < 100; flushId++) {
-      for (int i = 1; i <= 100; i++) {
+    for (int flushId = 0; flushId < 10; flushId++) {
+      for (int i = 1; i <= 10; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
         processor.write(record);
@@ -137,7 +137,7 @@ public class UnsealedTsFileProcessorV2Test {
     left = pair.left;
     right = pair.right;
     assertTrue(left.isEmpty());
-    assertEquals(100, right.size());
+    assertEquals(10, right.size());
     assertEquals(measurementId, right.get(0).getMeasurementUid());
     assertEquals(dataType, right.get(0).getTsDataType());
   }
@@ -156,7 +156,7 @@ public class UnsealedTsFileProcessorV2Test {
             }
             resource.setClosed(true);
           }
-        });
+        }, ()->true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
         .query(deviceId, measurementId, dataType, props);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
index 7ec738f..da83217 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
@@ -62,7 +62,7 @@ public class MemTableFlushTaskV2Test {
     MemTableFlushTaskV2 memTableFlushTask = new MemTableFlushTaskV2(writer, storageGroup,
         memtable -> {
           writer.makeMetadataVisible();
-          MemTablePool.getInstance().putBack(memtable);
+          MemTablePool.getInstance().putBack(memtable, storageGroup);
         });
     assertTrue(writer
         .getVisibleMetadatas(MemTableTestUtils.deviceId0, MemTableTestUtils.measurementId0,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index 79e13a3..5377bca 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -179,7 +179,7 @@ public class SingleClientSyncTest {
     fileSenderImpl.setConfig(config);
   }
 
-  public void setUp() throws StartupException {
+  public void setUp() throws StartupException, IOException {
     if (testFlag) {
       EnvironmentUtils.closeStatMonitor();
       EnvironmentUtils.closeMemControl();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 84df8d5..edb6a20 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -114,10 +114,8 @@ public class EnvironmentUtils {
     cleanDir(config.getDerbyHome());
     // delete index
     cleanDir(config.getIndexFileDir());
-    // delte data
-    cleanDir("data");
-    // delte derby log
-    // cleanDir("derby.log");
+    // delete data
+    cleanDir(config.getDataDir());
   }
 
   public static void cleanDir(String dir) throws IOException {
@@ -150,7 +148,9 @@ public class EnvironmentUtils {
     config.setEnableMemMonitor(false);
   }
 
-  public static void envSetUp() throws StartupException {
+  public static void envSetUp() throws StartupException, IOException {
+
+    createAllDir();
     // disable the memory control
     config.setEnableMemMonitor(false);
     // disable the system monitor
@@ -176,4 +176,32 @@ public class EnvironmentUtils {
       LOGGER.error("", e);
     }
   }
+
+  private static void createAllDir() throws IOException {
+    // create bufferwrite
+    for (String path : directories.getAllTsFileFolders()) {
+      createDir(path);
+    }
+    // create overflow
+    createDir(config.getOverflowDataDir());
+    // create filenode
+    createDir(config.getFileNodeDir());
+    // create metadata
+    createDir(config.getMetadataDir());
+    // create wal
+    createDir(config.getWalFolder());
+    // create derby
+    createDir(config.getDerbyHome());
+    // create index
+    createDir(config.getIndexFileDir());
+    // create data
+    createDir("data");
+    // delte derby log
+    // cleanDir("derby.log");
+  }
+
+  public static void createDir(String dir) throws IOException {
+    File file = new File(dir);
+    file.mkdirs();
+  }
 }