You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/04/20 15:58:20 UTC

[incubator-iotdb] 01/01: revert to status without new BufferWriter

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch replace_list_by_array
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d1cbd768d6fe243d308b430cc0405ef3b3962ed1
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Apr 20 23:57:56 2019 +0800

    revert to status without new BufferWriter
---
 iotdb/iotdb/conf/logback.xml                       |   2 -
 .../db/engine/tsfiledata/TsFileProcessor.java      | 823 ---------------------
 .../db/query/control/QueryResourceManager.java     |  23 -
 .../EngineExecutorWithoutTimeGenerator.java        |  58 --
 .../iotdb/db/query/executor/EngineQueryRouter.java |  20 -
 .../db/engine/tsfiledata/TsFileProcessorTest.java  | 251 -------
 .../integration/QueryDataFromUnclosedTsFileIT.java | 107 ---
 7 files changed, 1284 deletions(-)

diff --git a/iotdb/iotdb/conf/logback.xml b/iotdb/iotdb/conf/logback.xml
index 1f8a915..2629723 100644
--- a/iotdb/iotdb/conf/logback.xml
+++ b/iotdb/iotdb/conf/logback.xml
@@ -111,7 +111,6 @@
     </appender>
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
-
     <!-- a log appender that collect all log records whose level is greather than debug-->
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILEALL">
         <file>${IOTDB_HOME}/logs/log_all.log</file>
@@ -130,7 +129,6 @@
             <level>INFO</level>
         </filter>
     </appender>
-
     <root level="info">
         <appender-ref ref="FILEDEBUG"/>
         <appender-ref ref="FILEWARN"/>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
deleted file mode 100644
index 3e7bba4..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ /dev/null
@@ -1,823 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.tsfiledata;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.Directories;
-import org.apache.iotdb.db.engine.Processor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
-import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
-import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.pool.FlushManager;
-import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.qp.constant.DatetimeUtils;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Each storage group has a TsFileProcessor instance. Though there are many clients read/write data
- * by accessing TsFileProcessor, they need to get the readWriteReentrantLock of this processor
- * first. Therefore, as for different clients, the class looks like thread safe.
- * <br/> The class has two backend tasks:
- *  (1) submit a flush job to flush data from memtable to disk async;
- *  (2) close a tsfile and open a new one.
- * users also need to get the write lock to call these two tasks.
- */
-public class TsFileProcessor extends Processor {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
-  public static final int WRITE_SUCCESS = 1;
-  public static final int WRITE_IN_WARNING_MEM= 0;
-  public static final int WRITE_REJECT_BY_TIME  = -1;
-  public static final int WRITE_REJECT_BY_MEM = -2;
-
-  //this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
-  // are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
-  // processor), so be careful if you modify it.
-  private FileSchema fileSchemaRef;
-
-
-  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
-  //when a flush task is in the backend, then it has not conflict with write operatons (because
-  // write operation modifies wokMemtable, while flush task uses flushMemtable). However, the flush
-  // task has concurrent problem with query operations, because the query needs to read data from
-  // flushMemtable and the table may be clear if the flush operation ends. So, a Lock is needed.
-  private ReentrantLock flushQueryLock = new ReentrantLock();
-  private AtomicLong memSize = new AtomicLong();
-
-  //fileNamePrefix (system time, rather than data time) time unit: nanosecond
-  // this is used for generate the new TsFile name
-  private long fileNamePrefix = System.nanoTime();
-  //the times of calling insertion function (between two flush operations).
-  private long valueCount = 0;
-
-
-  private IMemTable workMemTable;
-  private IMemTable flushMemTable;
-  private RestorableTsFileIOWriter writer;
-  private Action beforeFlushAction;
-  private Action afterCloseAction;
-  private Action afterFlushAction;
-  private File insertFile;
-  private TsFileResource currentResource;
-
-  private List<TsFileResource> tsFileResources;
-  private Map<String, List<TsFileResource>> inverseIndexOfResource;
-
-  private Map<String, Long> minWrittenTimeForEachDeviceInCurrentFile;
-  private Map<String, Long> maxWrittenTimeForEachDeviceInCurrentFile;
-  private Map<String, Long> lastFlushedTimeForEachDevice;
-
-  private WriteLogNode logNode;
-  private VersionController versionController;
-
-  /**
-   * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
-   *
-   * @param processorName processor name
-   * @param fileSchemaRef file schema
-   * @throws BufferWriteProcessorException BufferWriteProcessorException
-   */
-  @SuppressWarnings({"squid:S2259", "squid:S3776"})
-  public TsFileProcessor(String processorName,
-      Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
-      VersionController versionController,
-      FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
-    super(processorName);
-    this.fileSchemaRef = fileSchemaRef;
-    this.processorName = processorName;
-
-    this.beforeFlushAction = beforeFlushAction;
-    this.afterCloseAction = afterCloseAction;
-    this.afterFlushAction = afterFlushAction;
-    workMemTable = new PrimitiveMemTable();
-    tsFileResources = new ArrayList<>();
-    inverseIndexOfResource = new HashMap<>();
-    lastFlushedTimeForEachDevice = new HashMap<>();
-    minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
-    maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
-    File unclosedFile = null;
-    String unclosedFileName = null;
-    int unclosedFileCount = 0;
-    for (String folderPath : Directories.getInstance().getAllTsFileFolders()) {
-      File dataFolder = new File(folderPath, processorName);
-      if (dataFolder.exists()) {
-        // we do not add the unclosed tsfile into tsFileResources.
-        File[] unclosedFiles = dataFolder
-            .listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
-        unclosedFileCount += unclosedFiles.length;
-        if (unclosedFileCount > 1) {
-          break;
-        } else if (unclosedFileCount == 1) {
-          unclosedFileName = unclosedFiles[0].getName()
-              .split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
-          unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
-        }
-        File[] datas = dataFolder
-            .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX) && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
-        Arrays.sort(datas, Comparator.comparingLong(x -> Long.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
-        for (File tsfile : datas) {
-          //TODO we'd better define a file suffix for TsFile, e.g., .ts
-          String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
-          long time = Long.parseLong(names[0]);
-          if (fileNamePrefix < time) {
-            fileNamePrefix = time;
-          }
-          if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
-            TsFileResource resource = new TsFileResource(tsfile, true);
-            tsFileResources.add(resource);
-            //maintain the inverse index and fileNamePrefix
-            for (String device : resource.getDevices()) {
-              inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
-              lastFlushedTimeForEachDevice
-                  .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
-            }
-          }
-        }
-      } else {
-        //processor folder does not exist
-        dataFolder.mkdirs();
-      }
-    }
-    if (unclosedFileCount > 1) {
-      throw new BufferWriteProcessorException(String
-          .format("TsProcessor %s has more than one unclosed TsFile. please repair it",
-              processorName));
-    } else if (unclosedFileCount == 0){
-      unclosedFile = generateNewTsFilePath();
-    }
-
-    initCurrentTsFile(unclosedFile);
-
-    this.versionController = versionController;
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        logNode = MultiFileLogNodeManager.getInstance().getNode(
-            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
-            writer.getRestoreFilePath(),
-            FileNodeManager.getInstance().getRestoreFilePath(processorName));
-      } catch (IOException e) {
-        throw new BufferWriteProcessorException(e);
-      }
-    }
-  }
-
-
-  private File generateNewTsFilePath() throws BufferWriteProcessorException {
-    String dataDir = Directories.getInstance().getNextFolderForTsfile();
-    File dataFolder = new File(dataDir, processorName);
-    if (!dataFolder.exists()) {
-      if (!dataFolder.mkdirs()) {
-        throw new BufferWriteProcessorException(
-            String.format("Can not create TsFileProcess related folder: %s", dataFolder));
-      }
-      LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
-          dataFolder.getAbsolutePath());
-    }
-    String fileName = (fileNamePrefix + 1) + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
-        + System.currentTimeMillis();
-    return new File(dataFolder, fileName);
-  }
-
-
-  private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
-    this.insertFile = file;
-    try {
-      writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
-      this.currentResource = new TsFileResource(insertFile, writer);
-    } catch (IOException e) {
-      throw new BufferWriteProcessorException(e);
-    }
-
-    minWrittenTimeForEachDeviceInCurrentFile.clear();
-    maxWrittenTimeForEachDeviceInCurrentFile.clear();
-
-  }
-
-  /**
-   * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
-   * flushing operation will be called.
-   *
-   * @param plan data to be written
-   * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
-   * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
-   * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
-   * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
-   * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
-   */
-  public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
-    if (lastFlushedTimeForEachDevice.containsKey(plan.getDeviceId()) && plan.getTime() <= lastFlushedTimeForEachDevice.get(plan.getDeviceId())) {
-      return WRITE_REJECT_BY_TIME;
-    }
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      logNode.write(plan);
-    }
-    long memUsage = 0;
-    TSDataType type;
-    String measurement;
-    for (int i=0; i < plan.getMeasurements().length; i++){
-      measurement = plan.getMeasurements()[i];
-      type = fileSchemaRef.getMeasurementDataType(measurement);
-      memUsage += MemUtils.getPointSize(type, measurement);
-    }
-    UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
-    switch (level) {
-      case SAFE:
-        doInsert(plan);
-        checkMemThreshold4Flush(memUsage);
-        return WRITE_SUCCESS;
-      case WARNING:
-        if(LOGGER.isWarnEnabled()) {
-          LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
-              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
-        }
-        doInsert(plan);
-        try {
-          flush();
-        } catch (IOException e) {
-          throw new BufferWriteProcessorException(e);
-        }
-        return WRITE_IN_WARNING_MEM;
-      case DANGEROUS:
-        if (LOGGER.isWarnEnabled()) {
-          LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
-              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
-        }
-        return WRITE_REJECT_BY_MEM;
-      default:
-          return WRITE_REJECT_BY_MEM;
-    }
-  }
-
-  private void doInsert(InsertPlan plan) {
-    String deviceId = plan.getDeviceId();
-    long time = plan.getTime();
-    TSDataType type;
-    String measurement;
-    for (int i=0; i < plan.getMeasurements().length; i++){
-      measurement = plan.getMeasurements()[i];
-      type = fileSchemaRef.getMeasurementDataType(measurement);
-      workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
-    }
-    if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
-      minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
-    }
-    if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) || maxWrittenTimeForEachDeviceInCurrentFile
-        .get(deviceId) < time) {
-      maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
-    }
-    valueCount++;
-  }
-
-  /**
-   * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
-   * Delete data in both working MemTable and flushing MemTable.
-   *
-   * @param deviceId the deviceId of the timeseries to be deleted.
-   * @param measurementId the measurementId of the timeseries to be deleted.
-   * @param timestamp the upper-bound of deletion time.
-   */
-  public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
-    workMemTable.delete(deviceId, measurementId, timestamp);
-    if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId) && maxWrittenTimeForEachDeviceInCurrentFile
-        .get(deviceId) < timestamp) {
-      maxWrittenTimeForEachDeviceInCurrentFile
-          .put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId, 0L));
-    }
-    boolean deleteFlushTable = false;
-    if (isFlush()) {
-      // flushing MemTable cannot be directly modified since another thread is reading it
-      flushMemTable = flushMemTable.copy();
-      deleteFlushTable = flushMemTable.delete(deviceId, measurementId, timestamp);
-    }
-    String fullPath = deviceId +
-        IoTDBConstant.PATH_SEPARATOR + measurementId;
-    Deletion deletion = new Deletion(fullPath, versionController.nextVersion(), timestamp);
-    if (deleteFlushTable || (currentResource.containsDevice(deviceId) && currentResource.getStartTime(deviceId) <= timestamp)) {
-      currentResource.getModFile().write(deletion);
-    }
-    for (TsFileResource resource : tsFileResources) {
-      if (resource.containsDevice(deviceId) && resource.getStartTime(deviceId) <= timestamp) {
-        resource.getModFile().write(deletion);
-      }
-    }
-    if (lastFlushedTimeForEachDevice.containsKey(deviceId) && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
-      lastFlushedTimeForEachDevice.put(deviceId, 0L);
-    }
-  }
-
-
-  private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
-    long newMem = memSize.addAndGet(addedMemory);
-    if (newMem > TSFileConfig.groupSizeInByte) {
-      if (LOGGER.isInfoEnabled()) {
-        String usageMem = MemUtils.bytesCntToStr(newMem);
-        String threshold = MemUtils.bytesCntToStr(TSFileConfig.groupSizeInByte);
-        LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
-            usageMem, processorName, threshold);
-      }
-      try {
-        flush();
-      } catch (IOException e) {
-        LOGGER.error("Flush bufferwrite error.", e);
-        throw new BufferWriteProcessorException(e);
-      }
-    }
-  }
-
-
-  /**
-   * this method is for preparing a task task and then submitting it.
-   * @return
-   * @throws IOException
-   */
-  @Override
-  public Future<Boolean> flush() throws IOException {
-    // waiting for the end of last flush operation.
-    try {
-      flushFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error(
-          "Encounter an interrupt error when waitting for the flushing, the TsFile Processor is {}.",
-          getProcessorName(), e);
-      Thread.currentThread().interrupt();
-    }
-    // statistic information for flush
-    if (valueCount <= 0) {
-      LOGGER.debug(
-          "TsFile Processor {} has zero data to be flushed, will return directly.", processorName);
-      flushFuture = new ImmediateFuture<>(true);
-      return flushFuture;
-    }
-
-    if (LOGGER.isInfoEnabled()) {
-      long thisFlushTime = System.currentTimeMillis();
-      LOGGER.info(
-          "The TsFile Processor {}: last flush time is {}, this flush time is {}, "
-              + "flush time interval is {} s", getProcessorName(),
-          DatetimeUtils.convertMillsecondToZonedDateTime(fileNamePrefix / 1000),
-          DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
-          (thisFlushTime - fileNamePrefix / 1000) / 1000);
-    }
-    fileNamePrefix = System.nanoTime();
-
-    // update the lastUpdatetime, prepare for flush
-    try {
-      beforeFlushAction.act();
-    } catch (Exception e) {
-      LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
-      throw new IOException(e);
-    }
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      logNode.notifyStartFlush();
-    }
-    valueCount = 0;
-    switchWorkToFlush();
-    long version = versionController.nextVersion();
-    BasicMemController.getInstance().releaseUsage(this, memSize.get());
-    memSize.set(0);
-    // switch
-    flushFuture = FlushManager.getInstance().submit(() -> flushTask(version));
-    return flushFuture;
-  }
-
-  /**
-   * this method will be concurrent with other methods..
-   *
-   * @param version the operation version that will tagged on the to be flushed memtable (i.e.,
-   * ChunkGroup)
-   * @return true if successfully.
-   */
-  private boolean flushTask(long version) {
-    boolean result;
-    long flushStartTime = System.currentTimeMillis();
-    LOGGER.info("The TsFile Processor {} starts flushing.", processorName);
-    try {
-      if (flushMemTable != null && !flushMemTable.isEmpty()) {
-        // flush data
-        MemTableFlushUtil.flushMemTable(fileSchemaRef, writer, flushMemTable,
-            version);
-        // write restore information
-        writer.flush();
-      } else {
-        //no need to flush.
-        return true;
-      }
-
-      afterFlushAction.act();
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        logNode.notifyEndFlush(null);
-      }
-      result = true;
-    } catch (Exception e) {
-      LOGGER.error(
-          "The TsFile Processor {} failed to flush, when calling the afterFlushAction(filenodeFlushAction).",
-          processorName, e);
-      result = false;
-    } finally {
-      try {
-        switchFlushToWork();
-      } catch (BufferWriteProcessorException e) {
-        LOGGER.error(e.getMessage());
-        result = false;
-      }
-      LOGGER.info("The TsFile Processor {} ends flushing.", processorName);
-    }
-    if (LOGGER.isInfoEnabled()) {
-      long flushEndTime = System.currentTimeMillis();
-      LOGGER.info(
-          "The TsFile Processor {} flush, start time is {}, flush end time is {}, "
-              + "flush time consumption is {}ms",
-          processorName,
-          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
-          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
-          flushEndTime - flushStartTime);
-    }
-    return result;
-  }
-
-  private void switchWorkToFlush() {
-    flushQueryLock.lock();
-    try {
-      if (flushMemTable == null) {
-        flushMemTable = workMemTable;
-        for (String device : flushMemTable.getMemTableMap().keySet()) {
-          lastFlushedTimeForEachDevice.put(device, maxWrittenTimeForEachDeviceInCurrentFile.get(device));
-        }
-        workMemTable = new PrimitiveMemTable();
-      }
-    } finally {
-      flushQueryLock.unlock();
-    }
-  }
-
-  private void switchFlushToWork() throws BufferWriteProcessorException {
-    flushQueryLock.lock();
-    try {
-      //we update the index of currentTsResource.
-      for (String device : flushMemTable.getMemTableMap().keySet()) {
-        currentResource.setStartTime(device, minWrittenTimeForEachDeviceInCurrentFile.get(device));
-        // new end time must be larger than the old one.
-        currentResource.setEndTime(device, lastFlushedTimeForEachDevice.get(device));
-      }
-      flushMemTable.clear();
-      flushMemTable = null;
-      //make chunk groups in this flush task visble
-      writer.appendMetadata();
-      if (needCloseCurrentFile()) {
-        closeCurrentTsFileAndOpenNewOne();
-      }
-    } finally {
-      flushQueryLock.unlock();
-    }
-  }
-
-  @Override
-  public boolean canBeClosed() {
-    return true;
-  }
-
-
-  /**
-   * this method do not call flush() to flush data in memory to disk.
-   * @throws BufferWriteProcessorException
-   */
-  private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
-    closeCurrentFile();
-    initCurrentTsFile(generateNewTsFilePath());
-  }
-
-  //very dangerous, how to make sure this function is thread safe (no other functions are running)
-  private void closeCurrentFile() throws BufferWriteProcessorException {
-    try {
-      long closeStartTime = System.currentTimeMillis();
-      // end file
-      if (writer.getChunkGroupMetaDatas().isEmpty()){
-        //this is an empty TsFile, we do not need to save it...
-        writer.endFile(fileSchemaRef);
-        Files.delete(Paths.get(insertFile.getAbsolutePath()));
-      } else {
-        writer.endFile(fileSchemaRef);
-        // update the IntervalFile for interval list
-        afterCloseAction.act();
-        // flush the changed information for filenode
-        afterFlushAction.act();
-
-        tsFileResources.add(currentResource);
-        //maintain the inverse index
-        for (String device : currentResource.getDevices()) {
-          inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>())
-              .add(currentResource);
-        }
-      }
-
-      // delete the restore for this bufferwrite processor
-      if (LOGGER.isInfoEnabled()) {
-        long closeEndTime = System.currentTimeMillis();
-        LOGGER.info(
-            "Close current TsFile {}, start time is {}, end time is {}, time consumption is {}ms",
-            insertFile.getAbsolutePath(),
-            DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
-            DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
-            closeEndTime - closeStartTime);
-      }
-
-      workMemTable.clear();
-      if (flushMemTable != null) {
-        flushMemTable.clear();
-      }
-      if (logNode != null) {
-        logNode.close();
-      }
-
-    } catch (IOException e) {
-      LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
-          getProcessorName(), e);
-      throw new BufferWriteProcessorException(e);
-    } catch (Exception e) {
-      LOGGER
-          .error("Failed to close the bufferwrite processor when calling the action function.", e);
-      throw new BufferWriteProcessorException(e);
-    }
-  }
-
-  @Override
-  public long memoryUsage() {
-    return 0;
-  }
-
-  /**
-   * check if is flushing.
-   *
-   * @return True if flushing
-   */
-  public boolean isFlush() {
-    // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
-    // So, the following case exists: flushMemtable != null but flushFuture is done (because the
-    // flushFuture refers to the last finished flush.
-    // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
-    // (flushTask() is not finished, but switchToWork() has done)
-    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
-    return flushMemTable != null;
-  }
-
-
-  /**
-   * query data.
-   */
-  public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
-      QueryContext context) throws IOException {
-    MeasurementSchema mSchema;
-    TSDataType dataType;
-
-    String deviceId = expression.getSeriesPath().getDevice();
-    String measurementId = expression.getSeriesPath().getMeasurement();
-
-    mSchema = fileSchemaRef.getMeasurementSchema(measurementId);
-    dataType = mSchema.getType();
-
-    // tsfile dataØØ
-    List<TsFileResource> dataFiles = new ArrayList<>();
-    for (TsFileResource tsfile : tsFileResources) {
-      //TODO in the old version, tsfile is deep copied. I do not know why
-      dataFiles.add(tsfile);
-    }
-    // bufferwrite data
-    //TODO unsealedTsFile class is a little redundant.
-    UnsealedTsFile unsealedTsFile = null;
-
-    if (currentResource.getStartTime(deviceId) >= 0) {
-      unsealedTsFile = new UnsealedTsFile();
-      unsealedTsFile.setFilePath(currentResource.getFile().getAbsolutePath());
-      List<ChunkMetaData> chunks = writer.getMetadatas(deviceId, measurementId, dataType);
-        List<Modification> pathModifications = context.getPathModifications(
-            currentResource.getModFile(), deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
-        if (!pathModifications.isEmpty()) {
-          QueryUtils.modifyChunkMetaData(chunks, pathModifications);
-        }
-
-      unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
-    }
-    return new GlobalSortedSeriesDataSource(
-        new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
-        queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
-  }
-
-  /**
-   * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
-   * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
-   *
-   * @param deviceId device id
-   * @param measurementId sensor id
-   * @param dataType data type
-   * @return corresponding chunk data in memory
-   */
-  private ReadOnlyMemChunk queryDataInMemtable(String deviceId,
-      String measurementId, TSDataType dataType, Map<String, String> props) {
-    flushQueryLock.lock();
-    try {
-      MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
-      if (flushMemTable != null) {
-        memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
-      }
-      memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
-      // memSeriesLazyMerger has handled the props,
-      // so we do not need to handle it again in the following readOnlyMemChunk
-      return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap());
-    } finally {
-      flushQueryLock.unlock();
-    }
-  }
-
-
-  public String getInsertFilePath() {
-    return insertFile.getAbsolutePath();
-  }
-
-  public WriteLogNode getLogNode() {
-    return logNode;
-  }
-
-  /**
-   * used for test. We can know when the flush() is called.
-   *
-   * @return the last flush() time. Time unit: nanosecond.
-   */
-  public long getFileNamePrefix() {
-    return fileNamePrefix;
-  }
-
-  /**
-   * used for test. We can block to wait for finishing flushing.
-   *
-   * @return the future of the flush() task.
-   */
-  public Future<Boolean> getFlushFuture() {
-    return flushFuture;
-  }
-
-  @Override
-  public void close() throws BufferWriteProcessorException {
-    closeCurrentFile();
-    try {
-      if (currentResource != null) {
-        currentResource.close();
-      }
-      for (TsFileResource resource : tsFileResources) {
-        resource.close();
-      }
-
-    } catch (IOException e) {
-      throw new BufferWriteProcessorException(e);
-    }
-  }
-
-  /**
-   * remove all data of this processor. Used For UT
-   */
-  public void removeMe() throws BufferWriteProcessorException, IOException {
-    try {
-      flushFuture.get(10000, TimeUnit.MILLISECONDS);
-    } catch (ExecutionException | TimeoutException e) {
-      LOGGER.error("can not end running flush task in 10 seconds: {}", e.getMessage());
-    } catch (InterruptedException e) {
-      LOGGER.error("current running flush task is interrupted.", e);
-      Thread.currentThread().interrupt();
-    }
-    close();
-    for (String folder : Directories.getInstance().getAllTsFileFolders()) {
-      File dataFolder = new File(folder, processorName);
-      if (dataFolder.exists()) {
-        for (File file: dataFolder.listFiles()) {
-          Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
-        }
-      }
-    }
-  }
-
-  /**
-   * Check if the currentTsFileResource is toooo large.
-   * @return  true if the file is too large.
-   */
-  private boolean needCloseCurrentFile() {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    long fileSize = currentResource.getFile().length();
-    if (fileSize >= config.getBufferwriteFileSizeThreshold()) {
-      if (LOGGER.isInfoEnabled()) {
-        LOGGER.info(
-            "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}.",
-            processorName, MemUtils.bytesCntToStr(fileSize), currentResource.getFilePath(),
-            MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
-      }
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof TsFileProcessor)) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    TsFileProcessor that = (TsFileProcessor) o;
-    return fileNamePrefix == that.fileNamePrefix &&
-        valueCount == that.valueCount &&
-        Objects.equals(fileSchemaRef, that.fileSchemaRef) &&
-        Objects.equals(flushFuture, that.flushFuture) &&
-        Objects.equals(flushQueryLock, that.flushQueryLock) &&
-        Objects.equals(memSize, that.memSize) &&
-        Objects.equals(workMemTable, that.workMemTable) &&
-        Objects.equals(flushMemTable, that.flushMemTable) &&
-        Objects.equals(writer, that.writer) &&
-        Objects.equals(beforeFlushAction, that.beforeFlushAction) &&
-        Objects.equals(afterCloseAction, that.afterCloseAction) &&
-        Objects.equals(afterFlushAction, that.afterFlushAction) &&
-        Objects.equals(insertFile, that.insertFile) &&
-        Objects.equals(currentResource, that.currentResource) &&
-        Objects.equals(tsFileResources, that.tsFileResources) &&
-        Objects.equals(inverseIndexOfResource, that.inverseIndexOfResource) &&
-        Objects.equals(lastFlushedTimeForEachDevice, that.lastFlushedTimeForEachDevice) &&
-        Objects.equals(logNode, that.logNode) &&
-        Objects.equals(versionController, that.versionController);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects
-        .hash(super.hashCode(), fileSchemaRef, flushFuture, flushQueryLock, memSize, fileNamePrefix,
-            valueCount, workMemTable, flushMemTable, writer, beforeFlushAction, afterCloseAction,
-            afterFlushAction, insertFile, currentResource, tsFileResources, inverseIndexOfResource,
-            lastFlushedTimeForEachDevice, logNode, versionController);
-  }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index d39fa39..e4ef67e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -197,26 +196,4 @@ public class QueryResourceManager {
     private QueryTokenManagerHelper() {
     }
   }
-
-
-  /**
-   * TODO
-   * This is only for test TsFileProcessor now. This method will finally be replaced when
-   * fileNodeManager is refactored
-   */
-  public QueryDataSource getQueryDataSourceByTsFileProcessor(Path selectedPath,
-      QueryContext context, TsFileProcessor processor)
-      throws IOException, FileNodeManagerException {
-    OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(selectedPath);
-    overflowSeriesDataSource.setOverflowInsertFileList(Collections.EMPTY_LIST);
-
-    SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
-    GlobalSortedSeriesDataSource dataSource =processor.query(singleSeriesExpression, context);
-    QueryDataSource queryDataSource = new QueryDataSource(dataSource, overflowSeriesDataSource);
-    // add used files to current thread request cached map
-    filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource);
-
-    return queryDataSource;
-
-  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index d7829ba..07073cd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -162,61 +161,4 @@ public class EngineExecutorWithoutTimeGenerator {
       throw new FileNodeManagerException(e);
     }
   }
-
-  /**
-   * TODO
-   * This is only for test TsFileProcessor now. This method will finally be replaced when
-   * fileNodeManager is refactored
-   */
-  public QueryDataSet executeWithoutFilter(QueryContext context, TsFileProcessor processor)
-      throws FileNodeManagerException, IOException {
-
-    List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-
-    QueryResourceManager.getInstance()
-        .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries());
-
-    for (Path path : queryExpression.getSelectedSeries()) {
-
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-          .getQueryDataSourceByTsFileProcessor(path, context, processor);
-
-      // add data type
-      try {
-        dataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
-      } catch (PathErrorException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // sequence insert data
-      SequenceDataReader tsFilesReader;
-      try {
-        tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            null, context);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // unseq insert data
-      PriorityMergeReader unSeqMergeReader;
-      try {
-        unSeqMergeReader = SeriesReaderFactory.getInstance()
-            .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-      } catch (IOException e) {
-        throw new FileNodeManagerException(e);
-      }
-
-      // merge sequence data with unsequence data.
-      readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader));
-    }
-
-    try {
-      return new EngineDataSetWithoutTimeGenerator(queryExpression.getSelectedSeries(), dataTypes,
-          readersOfSelectedSeries);
-    } catch (IOException e) {
-      throw new FileNodeManagerException(e);
-    }
-  }
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 9143026..96ad5ad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -31,7 +30,6 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
 import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
 import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -110,24 +108,6 @@ public class EngineQueryRouter {
   }
 
   /**
-   * TODO
-   * This is only for test TsFileProcessor now. This method will finally be replaced when
-   * fileNodeManager is refactored
-   */
-  public QueryDataSet query(QueryExpression queryExpression, TsFileProcessor processor,
-      QueryContext context)
-      throws FileNodeManagerException, IOException {
-
-    if (queryExpression.hasQueryFilter()) {
-        throw new NotImplementedException("this function is just for test...");
-    } else {
-      EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
-          queryExpression);
-      return engineExecutor.executeWithoutFilter(context, processor);
-    }
-  }
-
-  /**
    * execute groupBy query.
    *
    * @param selectedSeries select path list
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
deleted file mode 100644
index 22d815c..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.tsfiledata;
-
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.bufferwrite.Action;
-import org.apache.iotdb.db.engine.bufferwrite.ActionException;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.ImmediateFuture;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TsFileProcessorTest {
-  private static Logger LOGGER = LoggerFactory.getLogger(TsFileProcessorTest.class);
-  TsFileProcessor processor;
-  MManager mManager;
-  EngineQueryRouter queryManager;
-  Action doNothingAction = new Action() {
-    @Override
-    public void act() throws ActionException {
-    }
-  };
-  Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
-
-  FileSchema schema;
-
-  long oldBufferwriteFileSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
-  @Before
-  public void setUp() throws Exception {
-    EnvironmentUtils.cleanEnv();
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
-    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
-//  now we do not support wal because it need to modify the wal module.
-//  IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
-    IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(2*1024*1024);
-    mManager = MManager.getInstance();
-    queryManager = new EngineQueryRouter();
-    measurementSchemaMap.put("s1", new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
-    measurementSchemaMap.put("s2", new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
-    measurementSchemaMap.put("s3", new MeasurementSchema("s3", TSDataType.FLOAT, TSEncoding.RLE));
-    schema = new FileSchema(measurementSchemaMap);
-    processor = new TsFileProcessor("root.test", doNothingAction, doNothingAction, doNothingAction,
-        SysTimeVersionController.INSTANCE, schema);
-    mManager.setStorageLevelToMTree("root.test");
-    mManager.addPathToMTree("root.test.d1.s1",  TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-    mManager.addPathToMTree("root.test.d2.s1",  TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-    mManager.addPathToMTree("root.test.d1.s2",  TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-    mManager.addPathToMTree("root.test.d2.s2",  TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-  }
-
-
-  @After
-  public void tearDown() throws Exception {
-    //processor.close();
-    // processor.writeLock();
-   processor.removeMe();
-   EnvironmentUtils.cleanEnv();
-   IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
-   IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(oldBufferwriteFileSizeThreshold);
-  }
-
-  @Test
-  public void insert()
-      throws BufferWriteProcessorException, IOException, ExecutionException, InterruptedException, FileNodeProcessorException, FileNodeManagerException, PathErrorException, MetadataArgsErrorException {
-    String[] s1 = new String[]{"s1"};
-    String[] s2 = new String[]{"s2"};
-    String[] value = new String[]{"5.0"};
-    ;
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  10, s1, value)));
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  10, s2, value)));
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
-    Future<Boolean> ok = processor.flush();
-    ok.get();
-    ok = processor.flush();
-    Assert.assertTrue(ok instanceof ImmediateFuture);
-    ok.get();
-    ok = processor.flush();
-    Assert.assertTrue(ok instanceof ImmediateFuture);
-    ok.get();
-
-    //let's rewrite timestamp =12 again..
-    Assert.assertEquals(TsFileProcessor.WRITE_REJECT_BY_TIME, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
-    processor.delete("root.test.d1", "s1",12);
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  12, s1, value)));
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  13, s1, value)));
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d2",  10, s1, value)));
-    Assert.assertEquals(TsFileProcessor.WRITE_SUCCESS, processor.insert(new InsertPlan("root.test.d1",  14, s1, value)));
-    processor.delete("root.test.d1", "s1",12);
-    processor.delete("root.test.d3", "s1",12);
-
-
-    QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
-    QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
-    while (result.hasNext()) {
-      RowRecord record = result.next();
-      System.out.println(record.getTimestamp() +"," + record.getFields().get(0).getFloatV());
-    }
-  }
-
-
-
-  @Test
-  public void bruteForceTest() throws InterruptedException, FileNodeManagerException, IOException {
-
-    String[] devices = new String[] {"root.test.d1", "root.test.d2"};
-    String[] sensors = new String[] {"s1", "s2"};
-    final boolean[] exception = {false, false, false};
-    final boolean[] goon = {true};
-    int totalsize = 50000;
-
-    final int[] count = {0};
-    QueryExpression qe = QueryExpression.create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
-    Thread insertThread = new Thread() {
-      @Override
-      public void run() {
-        int i =0;
-        long time = 100L;
-        long start = System.currentTimeMillis();
-        String[] sensors = new String[]{"s1"};
-        String[] values = new String[1];
-        try {
-          for (int j = 0; j < totalsize  && goon[0]; j++) {
-            processor.lock(true);
-//            processor.insert("root.test.d1","s1", time++,  String.valueOf(j));
-//            processor.insert("root.test.d2","s1", time++,  String.valueOf(j));
-            values[0] = String.valueOf(j);
-            processor.insert(new InsertPlan("root.test.d1",  time++, sensors, values));
-            processor.insert(new InsertPlan("root.test.d2",  time++, sensors, values));
-            processor.writeUnlock();
-            count[0]++;
-          }
-          System.out.println((System.currentTimeMillis() - start));
-        } catch (BufferWriteProcessorException | IOException e) {
-          // we will break out.
-          LOGGER.error(e.getMessage());
-          exception[0] = true;
-        }
-      }
-    };
-    Thread flushThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
-            processor.lock(true);
-            processor.flush();
-            processor.writeUnlock();
-          }
-        } catch (IOException e) {
-          // we will break out.
-          LOGGER.error(e.getMessage());
-          exception[1] = true;
-        }
-      }
-    };
-    //we temporary disable the query because there are bugs..
-    Thread queryThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          for (int j = 0; j < totalsize * 2 && goon[0]; j++) {
-            processor.lock(false);
-            QueryContext context = new QueryContext(QueryResourceManager.getInstance().assignJobId());
-            QueryDataSet result = queryManager.query(qe, processor, context);
-            while (result.hasNext()) {
-              result.next();
-            }
-            QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
-            processor.readUnlock();
-          }
-        } catch (IOException | FileNodeManagerException e) {
-          // we will break out.
-          LOGGER.error(e.getMessage());
-          exception[2] = true;
-        }
-      }
-    };
-    flushThread.start();
-    insertThread.start();
-    queryThread.start();
-    //wait at most 20 seconds.
-    insertThread.join(20000);
-    goon[0] = false;
-    //queryThread.join(5000);
-    Assert.assertFalse(exception[0]);
-    Assert.assertFalse(exception[1]);
-    Assert.assertFalse(exception[2]);
-    synchronized (this) {
-      while (queryThread.isAlive()) {
-        this.wait(50);
-      }
-    }
-    QueryDataSet result = queryManager.query(qe, processor, TEST_QUERY_CONTEXT);
-    int size =0;
-    while (result.hasNext()) {
-      RowRecord record = result.next();
-      size ++;
-    }
-    //Assert.assertEquals(count[0], size);
-  }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
deleted file mode 100644
index ceaebb0..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/QueryDataFromUnclosedTsFileIT.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.integration;
-
-
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
-import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
-
-import java.io.IOException;
-import java.util.Collections;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class QueryDataFromUnclosedTsFileIT {
-
-  long bufferWriteFileSize;
-  FileNodeManager sgManager;
-  MManager mManager;
-  EngineQueryRouter queryManager;
-  @Before
-  public void setUp() throws IOException, FileNodeManagerException {
-    EnvironmentUtils.cleanEnv();
-    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
-    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
-    bufferWriteFileSize = IoTDBDescriptor.getInstance().getConfig().getBufferwriteFileSizeThreshold();
-    //IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(100);
-    sgManager  = FileNodeManager.getInstance();
-    mManager = MManager.getInstance();
-    queryManager = new EngineQueryRouter();
-  }
-
-  @After
-  public void tearDown() throws FileNodeManagerException, IOException {
-    IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(bufferWriteFileSize);;
-    //sgManager.deleteAll();
-    //mManager.clear();
-    EnvironmentUtils.cleanEnv();
-
-  }
-
-  @Test
-  public void test()
-      throws FileNodeManagerException, IOException, PathErrorException, MetadataArgsErrorException {
-    //Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor,
-    //  Map<String, String> props
-    mManager.setStorageLevelToMTree("root.test");
-    mManager.addPathToMTree("root.test.d1.s1",  TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-    mManager.addPathToMTree("root.test.d2.s1",  TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections.emptyMap());
-    sgManager.addTimeSeries(new Path("root.test.d1", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
-        .emptyMap());
-    sgManager.addTimeSeries(new Path("root.test.d2", "s1"), TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY, Collections
-        .emptyMap());
-    long time = System.currentTimeMillis();
-    for (int i=0; i < 20000; i++) {
-      sgManager.insert(new TSRecord(i, "root.test.d1").addTuple(new IntDataPoint("s1", i)), false);
-      sgManager.insert(new TSRecord(i, "root.test.d2").addTuple(new IntDataPoint("s1", i)), false);
-    }
-    //for (int i=0; i< 2; i++) {
-      QueryExpression qe = QueryExpression
-          .create(Collections.singletonList(new Path("root.test.d1", "s1")), null);
-      QueryDataSet result = queryManager.query(qe, TEST_QUERY_CONTEXT);
-      while (result.hasNext()) {
-        RowRecord record = result.next();
-        //System.out.println(record.getTimestamp() + "," + record.getFields().get(0).getIntV());
-      }
-    //}
-
-  }
-
-}