You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/01/03 06:41:26 UTC

[incubator-iotdb] branch master updated: abstract TsFileFlushPolicy and allow specifying storage groups in flush command (#685)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba7ff96  abstract TsFileFlushPolicy and allow specifying storage groups in flush command (#685)
ba7ff96 is described below

commit ba7ff96f39e5e6835218eeb784b72f60cb0dde3d
Author: Jiang Tian <jt...@163.com>
AuthorDate: Fri Jan 3 14:41:16 2020 +0800

    abstract TsFileFlushPolicy and allow specifying storage groups in flush command (#685)
    
    * abstract TsFileFlushPolicy and allow specifying storage groups in flush command
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 54 ++++++++----------
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   | 54 ++++++++++++++++++
 .../engine/storagegroup/StorageGroupProcessor.java | 37 ++++---------
 .../db/engine/storagegroup/TsFileProcessor.java    |  6 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 31 ++++++++++-
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  3 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |  3 +-
 .../db/integration/IoTDBFlushQueryMergeTest.java   | 64 +++++++++++++++++++++-
 .../iotdb/db/query/reader/ReaderTestHelper.java    |  3 +-
 10 files changed, 188 insertions(+), 70 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f379f0d..6e95fdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +38,8 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -48,6 +49,7 @@ import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MNode;
@@ -71,7 +73,7 @@ public class StorageEngine implements IService {
 
   private final Logger logger;
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final long TTL_CHECK_INTERVAL = 60 * 1000;
+  private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
 
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
@@ -94,6 +96,7 @@ public class StorageEngine implements IService {
   }
 
   private ScheduledExecutorService ttlCheckThread;
+  private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
 
   private StorageEngine() {
     logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -116,7 +119,7 @@ public class StorageEngine implements IService {
     for (MNode storageGroup : sgNodes) {
       futures.add(recoveryThreadPool.submit((Callable<Void>) () -> {
         StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
-            storageGroup.getFullPath());
+            storageGroup.getFullPath(), fileFlushPolicy);
         processor.setDataTTL(storageGroup.getDataTTL());
         processorMap.put(storageGroup.getFullPath(), processor);
         logger.info("Storage Group Processor {} is recovered successfully",
@@ -182,7 +185,7 @@ public class StorageEngine implements IService {
           if (processor == null) {
             logger.info("construct a processor instance, the storage group is {}, Thread is {}",
                 storageGroupName, Thread.currentThread().getId());
-            processor = new StorageGroupProcessor(systemDir, storageGroupName);
+            processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy);
             processor.setDataTTL(
                 MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL());
             processorMap.put(storageGroupName, processor);
@@ -264,6 +267,21 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
+      throws StorageGroupNotSetException {
+    StorageGroupProcessor processor = processorMap.get(storageGroupName);
+    if (processor != null) {
+      processor.writeLock();
+      try {
+        processor.moveOneWorkProcessorToClosingList(isSeq);
+      } finally {
+        processor.writeUnlock();
+      }
+    } else {
+      throw new StorageGroupNotSetException(storageGroupName);
+    }
+  }
+
   /**
    * update data.
    */
@@ -308,34 +326,6 @@ public class StorageEngine implements IService {
   }
 
   /**
-   * Append one specified tsfile to the storage group. <b>This method is only provided for
-   * transmission module</b>
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
-  @SuppressWarnings("unused") // reimplement sync module
-  public boolean appendFileToStorageGroupProcessor(String storageGroupName,
-      TsFileResource appendFile,
-      String appendFilePath) throws StorageEngineException {
-    // TODO reimplement sync module
-    return true;
-  }
-
-  /**
-   * get all overlap TsFiles which are conflict with the appendFile.
-   *
-   * @param storageGroupName the seriesPath of storage group
-   * @param appendFile the appended tsfile information
-   */
-  @SuppressWarnings("unused") // reimplement sync module
-  public List<String> getOverlapFiles(String storageGroupName, TsFileResource appendFile,
-      String uuid) throws StorageEngineException {
-    // TODO reimplement sync module
-    return Collections.emptyList();
-  }
-
-  /**
    * count all Tsfiles which need to be upgraded
    *
    * @return total num of the tsfiles which need to be upgraded
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
new file mode 100644
index 0000000..06d8d19
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.flush;
+
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
+ * IoTDB, the flush or close is executed without constraint. But in the distributed version, the
+ * close is controlled by the leader and should not be performed by the follower alone.
+ */
+public interface TsFileFlushPolicy {
+
+  void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq);
+
+  class DirectFlushPolicy implements TsFileFlushPolicy{
+
+    private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);
+
+    @Override
+    public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
+        boolean isSeq) {
+      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+          tsFileProcessor.getWorkMemTableMemory(),
+          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+      if (tsFileProcessor.shouldClose()) {
+        storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq);
+      } else {
+        tsFileProcessor.asyncFlush();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d58b83a..8102955 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -184,14 +185,16 @@ public class StorageGroupProcessor {
   private long dataTTL = Long.MAX_VALUE;
 
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+  private TsFileFlushPolicy fileFlushPolicy;
 
   // allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not
   // including the files generated by merge
   private Set<Long> allDirectFileVersions = new HashSet<>();
 
-  public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
+  public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
       throws StorageGroupProcessorException {
     this.storageGroupName = storageGroupName;
+    this.fileFlushPolicy = fileFlushPolicy;
 
     // construct the file schema
     this.schema = constructSchema(storageGroupName);
@@ -448,15 +451,7 @@ public class StorageGroupProcessor {
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
-      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
-          tsFileProcessor.getWorkMemTableMemory(),
-          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
-      if (tsFileProcessor.shouldClose()) {
-        moveOneWorkProcessorToClosingList(sequence);
-      } else {
-        tsFileProcessor.asyncFlush();
-      }
+      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
   }
 
@@ -481,15 +476,7 @@ public class StorageGroupProcessor {
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
-      logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
-          tsFileProcessor.getWorkMemTableMemory(),
-          tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
-      if (tsFileProcessor.shouldClose()) {
-        moveOneWorkProcessorToClosingList(sequence);
-      } else {
-        tsFileProcessor.asyncFlush();
-      }
+      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
   }
 
@@ -552,18 +539,18 @@ public class StorageGroupProcessor {
 
 
   /**
-   * only called by insert(), thread-safety should be ensured by caller
+   * thread-safety should be ensured by caller
    */
-  private void moveOneWorkProcessorToClosingList(boolean sequence) {
+  public void moveOneWorkProcessorToClosingList(boolean sequence) {
     //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (sequence) {
+    if (sequence && workSequenceTsFileProcessor != null) {
       closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
       updateEndTimeMap(workSequenceTsFileProcessor);
       workSequenceTsFileProcessor.asyncClose();
       workSequenceTsFileProcessor = null;
       logger.info("close a sequence tsfile processor {}", storageGroupName);
-    } else {
+    } else if (workUnSequenceTsFileProcessor != null){
       closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
       workUnSequenceTsFileProcessor.asyncClose();
       workUnSequenceTsFileProcessor = null;
@@ -777,11 +764,11 @@ public class StorageGroupProcessor {
     return sensorSet;
   }
 
-  private void writeLock() {
+  public void writeLock() {
     insertLock.writeLock().lock();
   }
 
-  private void writeUnlock() {
+  public void writeUnlock() {
     insertLock.writeLock().unlock();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index cdf649e..2680b96 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -226,7 +226,7 @@ public class TsFileProcessor {
     }
   }
 
-  TsFileResource getTsFileResource() {
+  public TsFileResource getTsFileResource() {
     return tsFileResource;
   }
 
@@ -252,7 +252,7 @@ public class TsFileProcessor {
   }
 
 
-  boolean shouldClose() {
+  public boolean shouldClose() {
     long fileSize = tsFileResource.getFileSize();
     long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
         .getTsFileSizeThreshold();
@@ -543,7 +543,7 @@ public class TsFileProcessor {
     return flushingMemTables.size();
   }
 
-  long getWorkMemTableMemory() {
+  public long getWorkMemTableMemory() {
     return workMemTable.memSize();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 1f73572..7fb8456 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
 import org.apache.iotdb.db.qp.QueryProcessor;
@@ -469,10 +470,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       return false;
     }
     statement = statement.toLowerCase();
+    if (statement.startsWith("flush")) {
+      try {
+        execFlush(statement);
+      } catch (StorageGroupNotSetException e) {
+        throw new StorageEngineException(e);
+      }
+      return true;
+    }
     switch (statement) {
-      case "flush":
-        StorageEngine.getInstance().syncCloseAllProcessor();
-        return true;
       case "merge":
         StorageEngine.getInstance()
             .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
@@ -485,6 +491,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  private void execFlush(String statement) throws StorageGroupNotSetException {
+    String[] args = statement.split("\\s+");
+    if (args.length == 1) {
+      StorageEngine.getInstance().syncCloseAllProcessor();
+    } else if (args.length == 2){
+      String[] storageGroups = args[1].split(",");
+      for (String storageGroup : storageGroups) {
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
+      }
+    } else {
+      String[] storageGroups = args[1].split(",");
+      boolean isSeq = Boolean.parseBoolean(args[2]);
+      for (String storageGroup : storageGroups) {
+        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
+      }
+    }
+  }
+
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index d14e21e..5cec03d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -68,7 +69,7 @@ public class DeviceMetaDataCacheTest {
     EnvironmentUtils.envSetUp();
     MetadataManagerHelper.initMetadata();
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
-    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
     insertData();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 410de23..aed98a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -262,7 +263,7 @@ public class StorageGroupProcessorTest {
   class DummySGP extends StorageGroupProcessor {
 
     DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
-      super(systemInfoDir, storageGroupName);
+      super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
     }
 
     @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 334cfe2..3187063 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.StartupException;
@@ -90,7 +91,7 @@ public class TTLTest {
     MManager.getInstance().setStorageGroupToMTree(sg1);
     MManager.getInstance().setStorageGroupToMTree(sg2);
     storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
-        .getSystemDir(), sg1);
+        .getSystemDir(), sg1, new DirectFlushPolicy());
     MManager.getInstance().addPathToMTree(g1s1, TSDataType.INT64, TSEncoding.PLAIN,
         CompressionType.UNCOMPRESSED, Collections.emptyMap());
     storageGroupProcessor.addMeasurement("s1", TSDataType.INT64, TSEncoding.PLAIN,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
index 286b6ce..c3c9513 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -25,8 +33,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import java.sql.*;
-import static org.junit.Assert.fail;
 
 public class IoTDBFlushQueryMergeTest {
 
@@ -81,7 +87,7 @@ public class IoTDBFlushQueryMergeTest {
   }
 
   @Test
-  public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
+  public void selectAllSQLTest() throws ClassNotFoundException {
 
     Class.forName(Config.JDBC_DRIVER_NAME);
     try (Connection connection = DriverManager
@@ -102,4 +108,56 @@ public class IoTDBFlushQueryMergeTest {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testFlushGivenGroup() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+    String insertTemplate =
+        "INSERT INTO root.group%d(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)";
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.group1");
+      statement.execute("SET STORAGE GROUP TO root.group2");
+      statement.execute("SET STORAGE GROUP TO root.group3");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 10; j < 20; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 0; j < 10; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH root.group1");
+      statement.execute("FLUSH root.group2,root.group3");
+
+      for (int i = 1; i <= 3; i++) {
+        for (int j = 0; j < 30; j++) {
+          statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+        }
+      }
+      statement.execute("FLUSH root.group1 true");
+      statement.execute("FLUSH root.group2,root.group3 false");
+
+      ResultSet resultSet = statement.executeQuery("SELECT * from root.group1,root.group2,root"
+          + ".group3");
+      int i = 0;
+      while (resultSet.next()) {
+        i ++;
+      }
+      assertEquals(30, i);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 2677c9b..5173405 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -52,7 +53,7 @@ public abstract class ReaderTestHelper {
     EnvironmentUtils.envSetUp();
     MetadataManagerHelper.initMetadata();
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
-    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+    storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
     insertData();
   }