You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/27 08:45:31 UTC

[incubator-iotdb] branch pull_up_file_close created (now 36ba3d8)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch pull_up_file_close
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 36ba3d8  abstract TsFileFlushPolicy and allow specifying storage groups in flush command

This branch includes the following new commits:

     new 36ba3d8  abstract TsFileFlushPolicy and allow specifying storage groups in flush command

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: abstract TsFileFlushPolicy and allow specifying storage groups in flush command

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch pull_up_file_close
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 36ba3d8c6e5cb8108b146e3af3c208f99bd1ca6e
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Dec 27 16:44:43 2019 +0800

    abstract TsFileFlushPolicy and allow specifying storage groups in flush command
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 54 ++++++++----------
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   | 54 ++++++++++++++++++
 .../engine/storagegroup/StorageGroupProcessor.java | 37 ++++---------
 .../db/engine/storagegroup/TsFileProcessor.java    |  7 +--
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 31 ++++++++++-
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  3 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  3 +-
 .../db/integration/IoTDBFlushQueryMergeTest.java   | 64 +++++++++++++++++++++-
 .../iotdb/db/query/reader/ReaderTestHelper.java    |  3 +-
 10 files changed, 188 insertions(+), 71 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f379f0d..7c1c87a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +38,8 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -48,6 +49,7 @@ import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MNode;
@@ -71,7 +73,7 @@ public class StorageEngine implements IService {
 
   private final Logger logger;
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final long TTL_CHECK_INTERVAL = 60 * 1000;
+  private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
 
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
@@ -94,6 +96,7 @@ public class StorageEngine implements IService {
   }
 
   private ScheduledExecutorService ttlCheckThread;
+  private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
 
   private StorageEngine() {
     logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -116,7 +119,7 @@ public class StorageEngine implements IService {
     for (MNode storageGroup : sgNodes) {
       futures.add(recoveryThreadPool.submit((Callable<Void>) () -> {
         StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
-            storageGroup.getFullPath());
+            storageGroup.getFullPath(), fileFlushPolicy);
         processor.setDataTTL(storageGroup.getDataTTL());
         processorMap.put(storageGroup.getFullPath(), processor);
         logger.info("Storage Group Processor {} is recovered successfully",
@@ -182,7 +185,7 @@ public class StorageEngine implements IService {
           if (processor == null) {
             logger.info("construct a processor instance, the storage group is {}, Thread is {}",
                 storageGroupName, Thread.currentThread().getId());
-            processor = new StorageGroupProcessor(systemDir, storageGroupName);
+            processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy);
             processor.setDataTTL(
                 MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL());
             processorMap.put(storageGroupName, processor);
@@ -264,6 +267,21 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
+      throws StorageGroupNotSetException {
+    StorageGroupProcessor processor = processorMap.get(storageGroupName);
+    if (storageGroupName != null) {
+      processor.writeLock();
+      try {
+        processor.moveOneWorkProcessorToClosingList(isSeq);
+      } finally {
+        processor.writeUnlock();
+      }
+    } else {
+      throw new StorageGroupNotSetException(storageGroupName);
+    }
+  }
+
   /**
    * update data.
    */
@@ -308,34 +326,6 @@ public class StorageEngine implements IService {
   }
 
   /**
-   * Append one specified tsfile to the storage group. <b>This method is only provided for
-   * transmission module</b>
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
-  @SuppressWarnings("unused") // reimplement sync module
-  public boolean appendFileToStorageGroupProcessor(String storageGroupName,
-      TsFileResource appendFile,
-      String appendFilePath) throws StorageEngineException {
-    // TODO reimplement sync module
-    return true;
-  }
-
-  /**
-   * get all overlap TsFiles which are conflict with the appendFile.
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
-  @SuppressWarnings("unused") // reimplement sync module
-  public List<String> getOverlapFiles(String storageGroupName, TsFileResource appendFile,
-      String uuid) throws StorageEngineException {
-    // TODO reimplement sync module
-    return Collections.emptyList();
-  }
-
-  /**
    * count all Tsfiles which need to be upgraded
    *
    * @return total num of the tsfiles which need to be upgraded
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
new file mode 100644
index 0000000..06d8d19
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.flush;
+
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
+ * IoTDB, the flush or close is executed without constraint. But in the distributed version, the
+ * close is controlled by the leader and should not be performed by the follower alone.
+ */
+public interface TsFileFlushPolicy {
+
+  void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq);
+
+  class DirectFlushPolicy implements TsFileFlushPolicy{
+
+    private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);
+
+    @Override
+    public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
+        boolean isSeq) {
+      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+          tsFileProcessor.getWorkMemTableMemory(),
+          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+      if (tsFileProcessor.shouldClose()) {
+        storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq);
+      } else {
+        tsFileProcessor.asyncFlush();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 741b902..646d9cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -184,10 +185,12 @@ public class StorageGroupProcessor {
   private long dataTTL = Long.MAX_VALUE;
 
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+  private TsFileFlushPolicy fileFlushPolicy;
 
-  public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
+  public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
       throws StorageGroupProcessorException {
     this.storageGroupName = storageGroupName;
+    this.fileFlushPolicy = fileFlushPolicy;
 
     // construct the file schema
     this.schema = constructSchema(storageGroupName);
@@ -437,15 +440,7 @@ public class StorageGroupProcessor {
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
-      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
-          tsFileProcessor.getWorkMemTableMemory(),
-          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
-      if (tsFileProcessor.shouldClose()) {
-        moveOneWorkProcessorToClosingList(sequence);
-      } else {
-        tsFileProcessor.asyncFlush();
-      }
+      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
   }
 
@@ -470,15 +465,7 @@ public class StorageGroupProcessor {
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
-      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
-          tsFileProcessor.getWorkMemTableMemory(),
-          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
-      if (tsFileProcessor.shouldClose()) {
-        moveOneWorkProcessorToClosingList(sequence);
-      } else {
-        tsFileProcessor.asyncFlush();
-      }
+      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
   }
 
@@ -540,18 +527,18 @@ public class StorageGroupProcessor {
 
 
   /**
-   * only called by insert(), thread-safety should be ensured by caller
+   * thread-safety should be ensured by caller
    */
-  private void moveOneWorkProcessorToClosingList(boolean sequence) {
+  public void moveOneWorkProcessorToClosingList(boolean sequence) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (sequence) {
+    if (sequence && workSequenceTsFileProcessor != null) {
       closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
       updateEndTimeMap(workSequenceTsFileProcessor);
       workSequenceTsFileProcessor.asyncClose();
       workSequenceTsFileProcessor = null;
       logger.info("close a sequence tsfile processor {}", storageGroupName);
-    } else {
+    } else if (workUnSequenceTsFileProcessor != null){
       closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
       workUnSequenceTsFileProcessor.asyncClose();
       workUnSequenceTsFileProcessor = null;
@@ -765,11 +752,11 @@ public class StorageGroupProcessor {
     return sensorSet;
   }
 
-  private void writeLock() {
+  public void writeLock() {
     insertLock.writeLock().lock();
   }
 
-  private void writeUnlock() {
+  public void writeUnlock() {
     insertLock.writeLock().unlock();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index eec8828..c403faf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -224,7 +223,7 @@ public class TsFileProcessor {
     }
   }
 
-  TsFileResource getTsFileResource() {
+  public TsFileResource getTsFileResource() {
     return tsFileResource;
   }
 
@@ -250,7 +249,7 @@ public class TsFileProcessor {
   }
 
 
-  boolean shouldClose() {
+  public boolean shouldClose() {
     long fileSize = tsFileResource.getFileSize();
     long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
         .getTsFileSizeThreshold();
@@ -541,7 +540,7 @@ public class TsFileProcessor {
     return flushingMemTables.size();
   }
 
-  long getWorkMemTableMemory() {
+  public long getWorkMemTableMemory() {
     return workMemTable.memSize();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e1b374f..55df7c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
 import org.apache.iotdb.db.qp.QueryProcessor;
@@ -469,10 +470,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return false;
     }
     statement = statement.toLowerCase();
+    if (statement.startsWith("flush")) {
+      try {
+        execFlush(statement);
+      } catch (StorageGroupNotSetException e) {
+        throw new StorageEngineException(e);
+      }
+      return true;
+    }
     switch (statement) {
-      case "flush":
-        StorageEngine.getInstance().syncCloseAllProcessor();
-        return true;
       case "merge":
         StorageEngine.getInstance()
             .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
@@ -485,6 +491,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  private void execFlush(String statement) throws StorageGroupNotSetException {
+    String[] args = statement.split("\\s+");
+    if (args.length == 1) {
+      StorageEngine.getInstance().syncCloseAllProcessor();
+    } else if (args.length == 2){
+      String[] storageGroups = args[1].split(",\\s*");
+      for (String storageGroup : storageGroups) {
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
+      }
+    } else {
+      String[] storageGroups = args[1].split(",\\s*");
+      boolean isSeq = Boolean.parseBoolean(args[2]);
+      for (String storageGroup : storageGroups) {
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
+      }
+    }
+  }
+
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index d14e21e..5cec03d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -68,7 +69,7 @@ public class DeviceMetaDataCacheTest {
     EnvironmentUtils.envSetUp();
     MetadataManagerHelper.initMetadata();
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
-    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
     insertData();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 410de23..aed98a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -262,7 +263,7 @@ public class StorageGroupProcessorTest {
   class DummySGP extends StorageGroupProcessor {
 
     DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
-      super(systemInfoDir, storageGroupName);
+      super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
     }
 
     @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 334cfe2..3187063 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.StartupException;
@@ -90,7 +91,7 @@ public class TTLTest {
     MManager.getInstance().setStorageGroupToMTree(sg1);
     MManager.getInstance().setStorageGroupToMTree(sg2);
     storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
-        .getSystemDir(), sg1);
+        .getSystemDir(), sg1, new DirectFlushPolicy());
     MManager.getInstance().addPathToMTree(g1s1, TSDataType.INT64, TSEncoding.PLAIN,
         CompressionType.UNCOMPRESSED, Collections.emptyMap());
     storageGroupProcessor.addMeasurement("s1", TSDataType.INT64, TSEncoding.PLAIN,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
index 286b6ce..c3c9513 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -25,8 +33,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import java.sql.*;
-import static org.junit.Assert.fail;
 
 public class IoTDBFlushQueryMergeTest {
 
@@ -81,7 +87,7 @@ public class IoTDBFlushQueryMergeTest {
   }
 
   @Test
-  public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
+  public void selectAllSQLTest() throws ClassNotFoundException {
 
     Class.forName(Config.JDBC_DRIVER_NAME);
     try (Connection connection = DriverManager
@@ -102,4 +108,56 @@ public class IoTDBFlushQueryMergeTest {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testFlushGivenGroup() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+    String insertTemplate =
+        "INSERT INTO root.group%d(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)";
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.group1");
+      statement.execute("SET STORAGE GROUP TO root.group2");
+      statement.execute("SET STORAGE GROUP TO root.group3");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 10; j < 20; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 0; j < 10; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH root.group1");
+      statement.execute("FLUSH root.group2,root.group3");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 0; j < 30; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH root.group1 true");
+      statement.execute("FLUSH root.group2,root.group3 false");
+
+      ResultSet resultSet = statement.executeQuery("SELECT * from root.group1,root.group2,root"
+          + ".group3");
+      int i = 0;
+      while (resultSet.next()) {
+        i ++;
+      }
+      assertEquals(30, i);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 2677c9b..5173405 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -52,7 +53,7 @@ public abstract class ReaderTestHelper {
     EnvironmentUtils.envSetUp();
     MetadataManagerHelper.initMetadata();
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
-    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
     insertData();
   }