You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/05/08 06:20:48 UTC

[incubator-iotdb] branch refactor_bufferwrite_new updated (8a6f8af -> 86553cd)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch refactor_bufferwrite_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 8a6f8af  merge with master
     add 67647fa  release memory asap in ReadOnlyMemChunk
     add 0fb2a96  Merge pull request #111 from apache/fix_faster_release_mem
     add 1f13d9f   Fix a clerical error in console display when run 'start-server.sh' (#171)
     add 7675fc5  [IOTDB-84]fix in proper hashcode methods in processors (#169)
     add 4f554c7  Refactor query resource count (#168)
     new 86553cd  merge with master

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 iotdb/iotdb/conf/iotdb-env.sh                      |   2 +-
 .../engine/bufferwrite/BufferWriteProcessor.java   |  23 +--
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  18 +--
 .../db/engine/filenode/FileNodeProcessor.java      | 156 ++++++++++++---------
 .../db/engine/overflow/io/OverflowProcessor.java   |  33 +----
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |   2 +
 .../db/engine/tsfiledata/TsFileProcessor.java      |  27 +---
 .../db/exception/FileNodeManagerException.java     |   4 +
 8 files changed, 119 insertions(+), 146 deletions(-)


[incubator-iotdb] 01/01: merge with master

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_bufferwrite_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 86553cda13ba9cc19e873bc9086ca994738e509e
Merge: 8a6f8af 4f554c7
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed May 8 14:20:36 2019 +0800

    merge with master

 iotdb/iotdb/conf/iotdb-env.sh                      |   2 +-
 .../engine/bufferwrite/BufferWriteProcessor.java   |  23 +--
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  18 +--
 .../db/engine/filenode/FileNodeProcessor.java      | 156 ++++++++++++---------
 .../db/engine/overflow/io/OverflowProcessor.java   |  33 +----
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |   2 +
 .../db/engine/tsfiledata/TsFileProcessor.java      |  27 +---
 .../db/exception/FileNodeManagerException.java     |   4 +
 8 files changed, 119 insertions(+), 146 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 066d4b1,0000000..5e769c1
mode 100644,000000..100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@@ -1,839 -1,0 +1,816 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.engine.tsfiledata;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.nio.file.Paths;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.ReentrantLock;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBConstant;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.conf.directories.Directories;
 +import org.apache.iotdb.db.engine.Processor;
 +import org.apache.iotdb.db.engine.bufferwrite.Action;
 +import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 +import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
 +import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 +import org.apache.iotdb.db.engine.filenode.TsFileResource;
 +import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 +import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 +import org.apache.iotdb.db.engine.memtable.IMemTable;
 +import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 +import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 +import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 +import org.apache.iotdb.db.engine.modification.Deletion;
 +import org.apache.iotdb.db.engine.modification.Modification;
 +import org.apache.iotdb.db.engine.pool.FlushManager;
 +import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 +import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 +import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
 +import org.apache.iotdb.db.engine.version.VersionController;
 +import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 +import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 +import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 +import org.apache.iotdb.db.query.context.QueryContext;
 +import org.apache.iotdb.db.utils.ImmediateFuture;
 +import org.apache.iotdb.db.utils.MemUtils;
 +import org.apache.iotdb.db.utils.QueryUtils;
 +import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 +import org.apache.iotdb.db.writelog.node.WriteLogNode;
 +import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.read.common.Path;
 +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 +import org.apache.iotdb.tsfile.write.schema.FileSchema;
 +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Each storage group has a TsFileProcessor instance. Though there are many clients read/write data
 + * by accessing TsFileProcessor, they need to get the readWriteReentrantLock of this processor
 + * first. Therefore, as for different clients, the class looks like thread safe.
 + * <br/> The class has two backend tasks:
 + *  (1) submit a flush job to flush data from memtable to disk async;
 + *  (2) close a tsfile and open a new one.
 + * users also need to get the write lock to call these two tasks.
 + */
 +public class TsFileProcessor extends Processor {
 +
 +  private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
 +  public static final int WRITE_SUCCESS = 1;
 +  public static final int WRITE_IN_WARNING_MEM= 0;
 +  public static final int WRITE_REJECT_BY_TIME  = -1;
 +  public static final int WRITE_REJECT_BY_MEM = -2;
 +
 +  //this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
 +  // are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
 +  // processor), so be careful if you modify it.
 +  private FileSchema fileSchemaRef;
 +
 +
 +  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
 +  //when a flush task is in the backend, then it has not conflict with write operatons (because
 +  // write operation modifies wokMemtable, while flush task uses flushMemtable). However, the flush
 +  // task has concurrent problem with query operations, because the query needs to read data from
 +  // flushMemtable and the table may be clear if the flush operation ends. So, a Lock is needed.
 +  private ReentrantLock flushQueryLock = new ReentrantLock();
 +  private AtomicLong memSize = new AtomicLong();
 +
 +  //fileNamePrefix (system time, rather than data time) time unit: nanosecond
 +  // this is used for generate the new TsFile name
 +  private long fileNamePrefix = System.nanoTime();
 +  //the times of calling insertion function (between two flush operations).
 +  private long valueCount = 0;
 +
 +
 +  private IMemTable workMemTable;
 +  private IMemTable flushMemTable;
 +  private RestorableTsFileIOWriter writer;
 +  private Action beforeFlushAction;
 +  private Action afterCloseAction;
 +  private Action afterFlushAction;
 +  private File insertFile;
 +  private TsFileResource currentResource;
 +
 +  private List<TsFileResource> tsFileResources;
 +  private Map<String, List<TsFileResource>> inverseIndexOfResource;
 +
 +  private Map<String, Long> minWrittenTimeForEachDeviceInCurrentFile;
 +  private Map<String, Long> maxWrittenTimeForEachDeviceInCurrentFile;
 +  private Map<String, Long> lastFlushedTimeForEachDevice;
 +
 +  private WriteLogNode logNode;
 +  private VersionController versionController;
 +
 +  /**
 +   * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
 +   *
 +   * @param processorName processor name
 +   * @param fileSchemaRef file schema
 +   * @throws BufferWriteProcessorException BufferWriteProcessorException
 +   */
 +  @SuppressWarnings({"squid:S2259", "squid:S3776"})
 +  public TsFileProcessor(String processorName,
 +      Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
 +      VersionController versionController,
 +      FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
 +    super(processorName);
 +    this.fileSchemaRef = fileSchemaRef;
 +    this.processorName = processorName;
 +
 +    this.beforeFlushAction = beforeFlushAction;
 +    this.afterCloseAction = afterCloseAction;
 +    this.afterFlushAction = afterFlushAction;
 +    workMemTable = new PrimitiveMemTable();
 +    tsFileResources = new ArrayList<>();
 +    inverseIndexOfResource = new HashMap<>();
 +    lastFlushedTimeForEachDevice = new HashMap<>();
 +    minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
 +    maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
 +    File unclosedFile = null;
 +    String unclosedFileName = null;
 +    int unclosedFileCount = 0;
 +    for (String folderPath : getAllDataFolders()) {
 +      File dataFolder = new File(folderPath, processorName);
 +      if (dataFolder.exists()) {
 +        // we do not add the unclosed tsfile into tsFileResources.
 +        File[] unclosedFiles = dataFolder
 +            .listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
 +        unclosedFileCount += unclosedFiles.length;
 +        if (unclosedFileCount > 1) {
 +          break;
 +        } else if (unclosedFileCount == 1) {
 +          unclosedFileName = unclosedFiles[0].getName()
 +              .split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
 +          unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
 +        }
 +        File[] datas = dataFolder
 +            .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
 +                && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
 +        Arrays.sort(datas, Comparator.comparingLong(x -> Long
 +            .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
 +        for (File tsfile : datas) {
 +          //TODO we'd better define a file suffix for TsFile, e.g., .ts
 +          String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
 +          long time = Long.parseLong(names[0]);
 +          if (fileNamePrefix < time) {
 +            fileNamePrefix = time;
 +          }
 +          if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
 +            TsFileResource resource = new TsFileResource(tsfile, true);
 +            tsFileResources.add(resource);
 +            //maintain the inverse index and fileNamePrefix
 +            for (String device : resource.getDevices()) {
 +              inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
 +              lastFlushedTimeForEachDevice
 +                  .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
 +            }
 +          }
 +        }
 +      } else {
 +        //processor folder does not exist
 +        dataFolder.mkdirs();
 +      }
 +    }
 +    if (unclosedFileCount > 1) {
 +      throw new BufferWriteProcessorException(String
 +          .format("TsProcessor %s has more than one unclosed TsFile. please repair it",
 +              processorName));
 +    } else if (unclosedFileCount == 0) {
 +      unclosedFile = generateNewTsFilePath();
 +    }
 +
 +    initCurrentTsFile(unclosedFile);
 +
 +    this.versionController = versionController;
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      try {
 +        logNode = MultiFileLogNodeManager.getInstance().getNode(
 +            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
 +            writer.getRestoreFilePath(),
 +            FileNodeManager.getInstance().getRestoreFilePath(processorName));
 +      } catch (IOException e) {
 +        throw new BufferWriteProcessorException(e);
 +      }
 +    }
 +  }
 +
 +
 +  private File generateNewTsFilePath() throws BufferWriteProcessorException {
 +    String dataDir = getNextDataFolder();
 +    File dataFolder = new File(dataDir, processorName);
 +    if (!dataFolder.exists()) {
 +      if (!dataFolder.mkdirs()) {
 +        throw new BufferWriteProcessorException(
 +            String.format("Can not create TsFileProcess related folder: %s", dataFolder));
 +      }
 +      LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
 +          dataFolder.getAbsolutePath());
 +    }
 +    String fileName = (fileNamePrefix + 1) + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
 +        + System.currentTimeMillis();
 +    return new File(dataFolder, fileName);
 +  }
 +
 +
 +  private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
 +    this.insertFile = file;
 +    try {
 +      writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
 +      this.currentResource = new TsFileResource(insertFile, writer);
 +    } catch (IOException e) {
 +      throw new BufferWriteProcessorException(e);
 +    }
 +
 +    minWrittenTimeForEachDeviceInCurrentFile.clear();
 +    maxWrittenTimeForEachDeviceInCurrentFile.clear();
 +
 +  }
 +
 +  protected boolean canWrite(String device, long timestamp) {
 +    return !lastFlushedTimeForEachDevice.containsKey(device)
 +        || timestamp > lastFlushedTimeForEachDevice.get(device);
 +  }
 +  /**
 +   * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
 +   * flushing operation will be called.
 +   *
 +   * @param plan data to be written
 +   * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
 +   * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
 +   * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
 +   * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
 +   * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
 +   */
 +  public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
 +    if (!canWrite(plan.getDeviceId(), plan.getTime())) {
 +      return WRITE_REJECT_BY_TIME;
 +    }
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      logNode.write(plan);
 +    }
 +    long memUsage = 0;
 +    TSDataType type;
 +    String measurement;
 +    for (int i=0; i < plan.getMeasurements().length; i++){
 +      measurement = plan.getMeasurements()[i];
 +      type = fileSchemaRef.getMeasurementDataType(measurement);
 +      memUsage += MemUtils.getPointSize(type, measurement);
 +    }
 +    UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
 +    switch (level) {
 +      case SAFE:
 +        doInsert(plan);
 +        checkMemThreshold4Flush(memUsage);
 +        return WRITE_SUCCESS;
 +      case WARNING:
 +        if(LOGGER.isWarnEnabled()) {
 +          LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
 +              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
 +        }
 +        doInsert(plan);
 +        try {
 +          flush();
 +        } catch (IOException e) {
 +          throw new BufferWriteProcessorException(e);
 +        }
 +        return WRITE_IN_WARNING_MEM;
 +      case DANGEROUS:
 +        if (LOGGER.isWarnEnabled()) {
 +          LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
 +              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
 +        }
 +        return WRITE_REJECT_BY_MEM;
 +      default:
 +        return WRITE_REJECT_BY_MEM;
 +    }
 +  }
 +
 +  private void doInsert(InsertPlan plan) {
 +    String deviceId = plan.getDeviceId();
 +    long time = plan.getTime();
 +    TSDataType type;
 +    String measurement;
 +    for (int i = 0; i < plan.getMeasurements().length; i++) {
 +      measurement = plan.getMeasurements()[i];
 +      type = fileSchemaRef.getMeasurementDataType(measurement);
 +      workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
 +    }
 +    if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
 +      minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
 +    }
 +    if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
 +        || maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < time) {
 +      maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
 +    }
 +    valueCount++;
 +  }
 +
 +  /**
 +   * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
 +   * Delete data in both working MemTable and flushing MemTable.
 +   *
 +   * @param deviceId the deviceId of the timeseries to be deleted.
 +   * @param measurementId the measurementId of the timeseries to be deleted.
 +   * @param timestamp the upper-bound of deletion time.
 +   */
 +  public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
 +    workMemTable.delete(deviceId, measurementId, timestamp);
 +    if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
 +        && maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < timestamp) {
 +      maxWrittenTimeForEachDeviceInCurrentFile
 +          .put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId, 0L));
 +    }
 +    boolean deleteFlushTable = false;
 +    if (isFlush()) {
 +      // flushing MemTable cannot be directly modified since another thread is reading it
 +      flushMemTable = flushMemTable.copy();
 +      deleteFlushTable = flushMemTable.delete(deviceId, measurementId, timestamp);
 +    }
 +    String fullPath = deviceId +
 +        IoTDBConstant.PATH_SEPARATOR + measurementId;
 +    Deletion deletion = new Deletion(fullPath, versionController.nextVersion(), timestamp);
 +    if (deleteFlushTable || (currentResource.containsDevice(deviceId)
 +        && currentResource.getStartTime(deviceId) <= timestamp)) {
 +      currentResource.getModFile().write(deletion);
 +    }
 +    for (TsFileResource resource : tsFileResources) {
 +      if (resource.containsDevice(deviceId) && resource.getStartTime(deviceId) <= timestamp) {
 +        resource.getModFile().write(deletion);
 +      }
 +    }
 +    if (lastFlushedTimeForEachDevice.containsKey(deviceId)
 +        && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
 +      lastFlushedTimeForEachDevice.put(deviceId, 0L);
 +    }
 +  }
 +
 +
 +  private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
 +    long newMem = memSize.addAndGet(addedMemory);
 +    if (newMem > TSFileConfig.groupSizeInByte) {
 +      if (LOGGER.isInfoEnabled()) {
 +        String usageMem = MemUtils.bytesCntToStr(newMem);
 +        String threshold = MemUtils.bytesCntToStr(TSFileConfig.groupSizeInByte);
 +        LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
 +            usageMem, processorName, threshold);
 +      }
 +      try {
 +        flush();
 +      } catch (IOException e) {
 +        LOGGER.error("Flush bufferwrite error.", e);
 +        throw new BufferWriteProcessorException(e);
 +      }
 +    }
 +  }
 +
 +
 +  /**
 +   * this method is for preparing a task task and then submitting it.
 +   * @return
 +   * @throws IOException
 +   */
 +  @Override
 +  public Future<Boolean> flush() throws IOException {
 +    // waiting for the end of last flush operation.
 +    try {
 +      flushFuture.get();
 +    } catch (InterruptedException | ExecutionException e) {
 +      LOGGER.error(
 +          "Encounter an interrupt error when waitting for the flushing, the TsFile Processor is {}.",
 +          getProcessorName(), e);
 +      Thread.currentThread().interrupt();
 +    }
 +    // statistic information for flush
 +    if (valueCount <= 0) {
 +      LOGGER.debug(
 +          "TsFile Processor {} has zero data to be flushed, will return directly.", processorName);
 +      flushFuture = new ImmediateFuture<>(true);
 +      return flushFuture;
 +    }
 +
 +    if (LOGGER.isInfoEnabled()) {
 +      long thisFlushTime = System.currentTimeMillis();
 +      LOGGER.info(
 +          "The TsFile Processor {}: last flush time is {}, this flush time is {}, "
 +              + "flush time interval is {} s", getProcessorName(),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(fileNamePrefix / 1000),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
 +          (thisFlushTime - fileNamePrefix / 1000) / 1000);
 +    }
 +    fileNamePrefix = System.nanoTime();
 +
 +    // update the lastUpdatetime, prepare for flush
 +    try {
 +      beforeFlushAction.act();
 +    } catch (Exception e) {
 +      LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
 +      throw new IOException(e);
 +    }
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      logNode.notifyStartFlush();
 +    }
 +    valueCount = 0;
 +    switchWorkToFlush();
 +    long version = versionController.nextVersion();
 +    BasicMemController.getInstance().releaseUsage(this, memSize.get());
 +    memSize.set(0);
 +    // switch
 +    flushFuture = FlushManager.getInstance().submit(() -> flushTask(version));
 +    return flushFuture;
 +  }
 +
 +  /**
 +   * this method will be concurrent with other methods..
 +   *
 +   * @param version the operation version that will tagged on the to be flushed memtable (i.e.,
 +   * ChunkGroup)
 +   * @return true if successfully.
 +   */
 +  private boolean flushTask(long version) {
 +    boolean result;
 +    long flushStartTime = System.currentTimeMillis();
 +    LOGGER.info("The TsFile Processor {} starts flushing.", processorName);
 +    try {
 +      if (flushMemTable != null && !flushMemTable.isEmpty()) {
 +        // flush data
 +        MemTableFlushUtil.flushMemTable(fileSchemaRef, writer, flushMemTable,
 +            version);
 +        // write restore information
 +        writer.flush();
 +      } else {
 +        //no need to flush.
 +        return true;
 +      }
 +
 +      afterFlushAction.act();
 +      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +        logNode.notifyEndFlush(null);
 +      }
 +      result = true;
 +    } catch (Exception e) {
 +      LOGGER.error(
 +          "The TsFile Processor {} failed to flush, when calling the afterFlushAction(filenodeFlushAction).",
 +          processorName, e);
 +      result = false;
 +    } finally {
 +      try {
 +        switchFlushToWork();
 +      } catch (BufferWriteProcessorException e) {
 +        LOGGER.error(e.getMessage());
 +        result = false;
 +      }
 +      LOGGER.info("The TsFile Processor {} ends flushing.", processorName);
 +    }
 +    if (LOGGER.isInfoEnabled()) {
 +      long flushEndTime = System.currentTimeMillis();
 +      LOGGER.info(
 +          "The TsFile Processor {} flush, start time is {}, flush end time is {}, "
 +              + "flush time consumption is {}ms",
 +          processorName,
 +          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
 +          flushEndTime - flushStartTime);
 +    }
 +    return result;
 +  }
 +
 +  private void switchWorkToFlush() {
 +    flushQueryLock.lock();
 +    try {
 +      if (flushMemTable == null) {
 +        flushMemTable = workMemTable;
 +        for (String device : flushMemTable.getMemTableMap().keySet()) {
 +          lastFlushedTimeForEachDevice.put(device, maxWrittenTimeForEachDeviceInCurrentFile.get(device));
 +        }
 +        workMemTable = new PrimitiveMemTable();
 +      }
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +  private void switchFlushToWork() throws BufferWriteProcessorException {
 +    flushQueryLock.lock();
 +    try {
 +      //we update the index of currentTsResource.
 +      for (String device : flushMemTable.getMemTableMap().keySet()) {
 +        currentResource.setStartTime(device, minWrittenTimeForEachDeviceInCurrentFile.get(device));
 +        // new end time must be larger than the old one.
 +        currentResource.setEndTime(device, lastFlushedTimeForEachDevice.get(device));
 +      }
 +      flushMemTable.clear();
 +      flushMemTable = null;
 +      //make chunk groups in this flush task visble
 +      writer.appendMetadata();
 +      if (needCloseCurrentFile()) {
 +        closeCurrentTsFileAndOpenNewOne();
 +      }
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +  @Override
 +  public boolean canBeClosed() {
 +    return true;
 +  }
 +
 +
 +  /**
 +   * this method do not call flush() to flush data in memory to disk.
 +   * @throws BufferWriteProcessorException
 +   */
 +  private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
 +    closeCurrentFile();
 +    initCurrentTsFile(generateNewTsFilePath());
 +  }
 +
 +  //very dangerous, how to make sure this function is thread safe (no other functions are running)
 +  private void closeCurrentFile() throws BufferWriteProcessorException {
 +    try {
 +      long closeStartTime = System.currentTimeMillis();
 +      // end file
 +      if (writer.getChunkGroupMetaDatas().isEmpty()){
 +        //this is an empty TsFile, we do not need to save it...
 +        writer.endFile(fileSchemaRef);
 +        Files.delete(Paths.get(insertFile.getAbsolutePath()));
 +      } else {
 +        writer.endFile(fileSchemaRef);
 +        // update the IntervalFile for interval list
 +        afterCloseAction.act();
 +        // flush the changed information for filenode
 +        afterFlushAction.act();
 +
 +        tsFileResources.add(currentResource);
 +        //maintain the inverse index
 +        for (String device : currentResource.getDevices()) {
 +          inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>())
 +              .add(currentResource);
 +        }
 +      }
 +
 +      // delete the restore for this bufferwrite processor
 +      if (LOGGER.isInfoEnabled()) {
 +        long closeEndTime = System.currentTimeMillis();
 +        LOGGER.info(
 +            "Close current TsFile {}, start time is {}, end time is {}, time consumption is {}ms",
 +            insertFile.getAbsolutePath(),
 +            DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
 +            DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
 +            closeEndTime - closeStartTime);
 +      }
 +
 +      workMemTable.clear();
 +      if (flushMemTable != null) {
 +        flushMemTable.clear();
 +      }
 +      if (logNode != null) {
 +        logNode.close();
 +      }
 +
 +    } catch (IOException e) {
 +      LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
 +          getProcessorName(), e);
 +      throw new BufferWriteProcessorException(e);
 +    } catch (Exception e) {
 +      LOGGER
 +          .error("Failed to close the bufferwrite processor when calling the action function.", e);
 +      throw new BufferWriteProcessorException(e);
 +    }
 +  }
 +
 +  @Override
 +  public long memoryUsage() {
 +    return 0;
 +  }
 +
 +  /**
 +   * check if is flushing.
 +   *
 +   * @return True if flushing
 +   */
 +  public boolean isFlush() {
 +    // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
 +    // So, the following case exists: flushMemtable != null but flushFuture is done (because the
 +    // flushFuture refers to the last finished flush.
 +    // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
 +    // (flushTask() is not finished, but switchToWork() has done)
 +    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
 +    return flushMemTable != null;
 +  }
 +
 +
 +  /**
 +   * query data.
 +   */
 +  public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
 +      QueryContext context) throws IOException {
 +    MeasurementSchema mSchema;
 +    TSDataType dataType;
 +
 +    String deviceId = expression.getSeriesPath().getDevice();
 +    String measurementId = expression.getSeriesPath().getMeasurement();
 +
 +    mSchema = fileSchemaRef.getMeasurementSchema(measurementId);
 +    dataType = mSchema.getType();
 +
 +    // tsfile dataØØ
 +    List<TsFileResource> dataFiles = new ArrayList<>();
 +    for (TsFileResource tsfile : tsFileResources) {
 +      //TODO in the old version, tsfile is deep copied. I do not know why
 +      dataFiles.add(tsfile);
 +    }
 +    // bufferwrite data
 +    //TODO unsealedTsFile class is a little redundant.
 +    UnsealedTsFile unsealedTsFile = null;
 +
 +    if (currentResource.getStartTime(deviceId) >= 0) {
 +      unsealedTsFile = new UnsealedTsFile();
 +      unsealedTsFile.setFilePath(currentResource.getFile().getAbsolutePath());
 +      List<ChunkMetaData> chunks = writer.getMetadatas(deviceId, measurementId, dataType);
 +      List<Modification> pathModifications = context.getPathModifications(
 +          currentResource.getModFile(), deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
 +      if (!pathModifications.isEmpty()) {
 +        QueryUtils.modifyChunkMetaData(chunks, pathModifications);
 +      }
 +
 +      unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
 +    }
 +    return new GlobalSortedSeriesDataSource(
 +        new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
 +        queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
 +  }
 +
 +  /**
 +   * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
 +   * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
 +   *
 +   * @param deviceId device id
 +   * @param measurementId sensor id
 +   * @param dataType data type
 +   * @return corresponding chunk data in memory
 +   */
 +  private ReadOnlyMemChunk queryDataInMemtable(String deviceId,
 +      String measurementId, TSDataType dataType, Map<String, String> props) {
 +    flushQueryLock.lock();
 +    try {
 +      MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
 +      if (flushMemTable != null) {
 +        memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
 +      }
 +      memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
 +      // memSeriesLazyMerger has handled the props,
 +      // so we do not need to handle it again in the following readOnlyMemChunk
 +      return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap());
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +
 +  public String getInsertFilePath() {
 +    return insertFile.getAbsolutePath();
 +  }
 +
 +  public WriteLogNode getLogNode() {
 +    return logNode;
 +  }
 +
 +  /**
 +   * used for test. We can know when the flush() is called.
 +   *
 +   * @return the last flush() time. Time unit: nanosecond.
 +   */
 +  public long getFileNamePrefix() {
 +    return fileNamePrefix;
 +  }
 +
 +  /**
 +   * used for test. We can block to wait for finishing flushing.
 +   *
 +   * @return the future of the flush() task.
 +   */
 +  public Future<Boolean> getFlushFuture() {
 +    return flushFuture;
 +  }
 +
 +  @Override
 +  public void close() throws BufferWriteProcessorException {
 +    closeCurrentFile();
 +    try {
 +      if (currentResource != null) {
 +        currentResource.close();
 +      }
 +      for (TsFileResource resource : tsFileResources) {
 +        resource.close();
 +      }
 +
 +    } catch (IOException e) {
 +      throw new BufferWriteProcessorException(e);
 +    }
 +  }
 +
 +  /**
 +   * remove all data of this processor. Used For UT
 +   */
 +  public void removeMe() throws BufferWriteProcessorException, IOException {
 +    try {
 +      flushFuture.get(10000, TimeUnit.MILLISECONDS);
 +    } catch (ExecutionException | TimeoutException e) {
 +      LOGGER.error("can not end running flush task in 10 seconds: {}", e.getMessage());
 +    } catch (InterruptedException e) {
 +      LOGGER.error("current running flush task is interrupted.", e);
 +      Thread.currentThread().interrupt();
 +    }
 +    close();
 +    for (String folder : Directories.getInstance().getAllTsFileFolders()) {
 +      File dataFolder = new File(folder, processorName);
 +      if (dataFolder.exists()) {
 +        for (File file: dataFolder.listFiles()) {
 +          Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Check if the currentTsFileResource is toooo large.
 +   * @return  true if the file is too large.
 +   */
 +  private boolean needCloseCurrentFile() {
 +    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 +    long fileSize = currentResource.getFile().length();
 +    if (fileSize >= config.getBufferwriteFileSizeThreshold()) {
 +      if (LOGGER.isInfoEnabled()) {
 +        LOGGER.info(
 +            "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}.",
 +            processorName, MemUtils.bytesCntToStr(fileSize), currentResource.getFilePath(),
 +            MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
 +      }
 +      return true;
 +    }
 +    return false;
 +  }
 +
 +  protected List<String> getAllDataFolders() {
 +    return Directories.getInstance().getAllTsFileFolders();
 +  }
 +
 +  protected String getNextDataFolder() {
 +    return Directories.getInstance().getNextFolderForTsfile();
 +  }
 +
 +  @Override
 +  public boolean equals(Object o) {
 +    if (this == o) {
 +      return true;
 +    }
 +    if (!(o instanceof TsFileProcessor)) {
 +      return false;
 +    }
 +    if (!super.equals(o)) {
 +      return false;
 +    }
-     TsFileProcessor that = (TsFileProcessor) o;
-     return fileNamePrefix == that.fileNamePrefix &&
-         valueCount == that.valueCount &&
-         Objects.equals(fileSchemaRef, that.fileSchemaRef) &&
-         Objects.equals(flushFuture, that.flushFuture) &&
-         Objects.equals(flushQueryLock, that.flushQueryLock) &&
-         Objects.equals(memSize, that.memSize) &&
-         Objects.equals(workMemTable, that.workMemTable) &&
-         Objects.equals(flushMemTable, that.flushMemTable) &&
-         Objects.equals(writer, that.writer) &&
-         Objects.equals(beforeFlushAction, that.beforeFlushAction) &&
-         Objects.equals(afterCloseAction, that.afterCloseAction) &&
-         Objects.equals(afterFlushAction, that.afterFlushAction) &&
-         Objects.equals(insertFile, that.insertFile) &&
-         Objects.equals(currentResource, that.currentResource) &&
-         Objects.equals(tsFileResources, that.tsFileResources) &&
-         Objects.equals(inverseIndexOfResource, that.inverseIndexOfResource) &&
-         Objects.equals(lastFlushedTimeForEachDevice, that.lastFlushedTimeForEachDevice) &&
-         Objects.equals(logNode, that.logNode) &&
-         Objects.equals(versionController, that.versionController);
++    return this == o;
 +  }
 +
 +  @Override
 +  public int hashCode() {
-     return Objects
-         .hash(super.hashCode(), fileSchemaRef, flushFuture, flushQueryLock, memSize, fileNamePrefix,
-             valueCount, workMemTable, flushMemTable, writer, beforeFlushAction, afterCloseAction,
-             afterFlushAction, insertFile, currentResource, tsFileResources, inverseIndexOfResource,
-             lastFlushedTimeForEachDevice, logNode, versionController);
++    return super.hashCode();
 +  }
 +}