You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/05/08 06:20:49 UTC

[incubator-iotdb] 01/01: merge with master

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_bufferwrite_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 86553cda13ba9cc19e873bc9086ca994738e509e
Merge: 8a6f8af 4f554c7
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed May 8 14:20:36 2019 +0800

    merge with master

 iotdb/iotdb/conf/iotdb-env.sh                      |   2 +-
 .../engine/bufferwrite/BufferWriteProcessor.java   |  23 +--
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  18 +--
 .../db/engine/filenode/FileNodeProcessor.java      | 156 ++++++++++++---------
 .../db/engine/overflow/io/OverflowProcessor.java   |  33 +----
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |   2 +
 .../db/engine/tsfiledata/TsFileProcessor.java      |  27 +---
 .../db/exception/FileNodeManagerException.java     |   4 +
 8 files changed, 119 insertions(+), 146 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 066d4b1,0000000..5e769c1
mode 100644,000000..100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@@ -1,839 -1,0 +1,816 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.engine.tsfiledata;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.nio.file.Paths;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.ReentrantLock;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBConstant;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.conf.directories.Directories;
 +import org.apache.iotdb.db.engine.Processor;
 +import org.apache.iotdb.db.engine.bufferwrite.Action;
 +import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 +import org.apache.iotdb.db.engine.bufferwrite.RestorableTsFileIOWriter;
 +import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 +import org.apache.iotdb.db.engine.filenode.TsFileResource;
 +import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 +import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 +import org.apache.iotdb.db.engine.memtable.IMemTable;
 +import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 +import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 +import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 +import org.apache.iotdb.db.engine.modification.Deletion;
 +import org.apache.iotdb.db.engine.modification.Modification;
 +import org.apache.iotdb.db.engine.pool.FlushManager;
 +import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
 +import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 +import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
 +import org.apache.iotdb.db.engine.version.VersionController;
 +import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 +import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 +import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 +import org.apache.iotdb.db.query.context.QueryContext;
 +import org.apache.iotdb.db.utils.ImmediateFuture;
 +import org.apache.iotdb.db.utils.MemUtils;
 +import org.apache.iotdb.db.utils.QueryUtils;
 +import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 +import org.apache.iotdb.db.writelog.node.WriteLogNode;
 +import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.read.common.Path;
 +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 +import org.apache.iotdb.tsfile.write.schema.FileSchema;
 +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Each storage group has a TsFileProcessor instance. Though there are many clients read/write data
 + * by accessing TsFileProcessor, they need to get the readWriteReentrantLock of this processor
 + * first. Therefore, as for different clients, the class looks like thread safe.
 + * <br/> The class has two backend tasks:
 + *  (1) submit a flush job to flush data from memtable to disk async;
 + *  (2) close a tsfile and open a new one.
 + * users also need to get the write lock to call these two tasks.
 + */
 +public class TsFileProcessor extends Processor {
 +
 +  private static final Logger LOGGER = LoggerFactory.getLogger(TsFileProcessor.class);
 +  public static final int WRITE_SUCCESS = 1;
 +  public static final int WRITE_IN_WARNING_MEM= 0;
 +  public static final int WRITE_REJECT_BY_TIME  = -1;
 +  public static final int WRITE_REJECT_BY_MEM = -2;
 +
 +  //this is just a part of fileSchemaRef: only the measurements that belong to this TsFileProcessor
 +  // are in this fileSchemaRef. And, this filed is shared with other classes (i.e., storage group
 +  // processor), so be careful if you modify it.
 +  private FileSchema fileSchemaRef;
 +
 +
 +  private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
 +  //when a flush task is in the backend, then it has not conflict with write operatons (because
 +  // write operation modifies wokMemtable, while flush task uses flushMemtable). However, the flush
 +  // task has concurrent problem with query operations, because the query needs to read data from
 +  // flushMemtable and the table may be clear if the flush operation ends. So, a Lock is needed.
 +  private ReentrantLock flushQueryLock = new ReentrantLock();
 +  private AtomicLong memSize = new AtomicLong();
 +
 +  //fileNamePrefix (system time, rather than data time) time unit: nanosecond
 +  // this is used for generate the new TsFile name
 +  private long fileNamePrefix = System.nanoTime();
 +  //the times of calling insertion function (between two flush operations).
 +  private long valueCount = 0;
 +
 +
 +  private IMemTable workMemTable;
 +  private IMemTable flushMemTable;
 +  private RestorableTsFileIOWriter writer;
 +  private Action beforeFlushAction;
 +  private Action afterCloseAction;
 +  private Action afterFlushAction;
 +  private File insertFile;
 +  private TsFileResource currentResource;
 +
 +  private List<TsFileResource> tsFileResources;
 +  private Map<String, List<TsFileResource>> inverseIndexOfResource;
 +
 +  private Map<String, Long> minWrittenTimeForEachDeviceInCurrentFile;
 +  private Map<String, Long> maxWrittenTimeForEachDeviceInCurrentFile;
 +  private Map<String, Long> lastFlushedTimeForEachDevice;
 +
 +  private WriteLogNode logNode;
 +  private VersionController versionController;
 +
 +  /**
 +   * constructor of BufferWriteProcessor. data will be stored in baseDir/processorName/ folder.
 +   *
 +   * @param processorName processor name
 +   * @param fileSchemaRef file schema
 +   * @throws BufferWriteProcessorException BufferWriteProcessorException
 +   */
 +  @SuppressWarnings({"squid:S2259", "squid:S3776"})
 +  public TsFileProcessor(String processorName,
 +      Action beforeFlushAction, Action afterFlushAction, Action afterCloseAction,
 +      VersionController versionController,
 +      FileSchema fileSchemaRef) throws BufferWriteProcessorException, IOException {
 +    super(processorName);
 +    this.fileSchemaRef = fileSchemaRef;
 +    this.processorName = processorName;
 +
 +    this.beforeFlushAction = beforeFlushAction;
 +    this.afterCloseAction = afterCloseAction;
 +    this.afterFlushAction = afterFlushAction;
 +    workMemTable = new PrimitiveMemTable();
 +    tsFileResources = new ArrayList<>();
 +    inverseIndexOfResource = new HashMap<>();
 +    lastFlushedTimeForEachDevice = new HashMap<>();
 +    minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
 +    maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
 +    File unclosedFile = null;
 +    String unclosedFileName = null;
 +    int unclosedFileCount = 0;
 +    for (String folderPath : getAllDataFolders()) {
 +      File dataFolder = new File(folderPath, processorName);
 +      if (dataFolder.exists()) {
 +        // we do not add the unclosed tsfile into tsFileResources.
 +        File[] unclosedFiles = dataFolder
 +            .listFiles(x -> x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX));
 +        unclosedFileCount += unclosedFiles.length;
 +        if (unclosedFileCount > 1) {
 +          break;
 +        } else if (unclosedFileCount == 1) {
 +          unclosedFileName = unclosedFiles[0].getName()
 +              .split(RestorableTsFileIOWriter.RESTORE_SUFFIX)[0];
 +          unclosedFile = new File(unclosedFiles[0].getParentFile(), unclosedFileName);
 +        }
 +        File[] datas = dataFolder
 +            .listFiles(x -> !x.getName().contains(RestorableTsFileIOWriter.RESTORE_SUFFIX)
 +                && x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR).length == 2);
 +        Arrays.sort(datas, Comparator.comparingLong(x -> Long
 +            .parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
 +        for (File tsfile : datas) {
 +          //TODO we'd better define a file suffix for TsFile, e.g., .ts
 +          String[] names = tsfile.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR);
 +          long time = Long.parseLong(names[0]);
 +          if (fileNamePrefix < time) {
 +            fileNamePrefix = time;
 +          }
 +          if (unclosedFileCount == 0 || !tsfile.getName().equals(unclosedFileName)) {
 +            TsFileResource resource = new TsFileResource(tsfile, true);
 +            tsFileResources.add(resource);
 +            //maintain the inverse index and fileNamePrefix
 +            for (String device : resource.getDevices()) {
 +              inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
 +              lastFlushedTimeForEachDevice
 +                  .merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
 +            }
 +          }
 +        }
 +      } else {
 +        //processor folder does not exist
 +        dataFolder.mkdirs();
 +      }
 +    }
 +    if (unclosedFileCount > 1) {
 +      throw new BufferWriteProcessorException(String
 +          .format("TsProcessor %s has more than one unclosed TsFile. please repair it",
 +              processorName));
 +    } else if (unclosedFileCount == 0) {
 +      unclosedFile = generateNewTsFilePath();
 +    }
 +
 +    initCurrentTsFile(unclosedFile);
 +
 +    this.versionController = versionController;
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      try {
 +        logNode = MultiFileLogNodeManager.getInstance().getNode(
 +            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
 +            writer.getRestoreFilePath(),
 +            FileNodeManager.getInstance().getRestoreFilePath(processorName));
 +      } catch (IOException e) {
 +        throw new BufferWriteProcessorException(e);
 +      }
 +    }
 +  }
 +
 +
 +  private File generateNewTsFilePath() throws BufferWriteProcessorException {
 +    String dataDir = getNextDataFolder();
 +    File dataFolder = new File(dataDir, processorName);
 +    if (!dataFolder.exists()) {
 +      if (!dataFolder.mkdirs()) {
 +        throw new BufferWriteProcessorException(
 +            String.format("Can not create TsFileProcess related folder: %s", dataFolder));
 +      }
 +      LOGGER.debug("The bufferwrite processor data dir doesn't exists, create new directory {}.",
 +          dataFolder.getAbsolutePath());
 +    }
 +    String fileName = (fileNamePrefix + 1) + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
 +        + System.currentTimeMillis();
 +    return new File(dataFolder, fileName);
 +  }
 +
 +
 +  private void initCurrentTsFile(File file) throws BufferWriteProcessorException {
 +    this.insertFile = file;
 +    try {
 +      writer = new RestorableTsFileIOWriter(processorName, insertFile.getAbsolutePath());
 +      this.currentResource = new TsFileResource(insertFile, writer);
 +    } catch (IOException e) {
 +      throw new BufferWriteProcessorException(e);
 +    }
 +
 +    minWrittenTimeForEachDeviceInCurrentFile.clear();
 +    maxWrittenTimeForEachDeviceInCurrentFile.clear();
 +
 +  }
 +
 +  protected boolean canWrite(String device, long timestamp) {
 +    return !lastFlushedTimeForEachDevice.containsKey(device)
 +        || timestamp > lastFlushedTimeForEachDevice.get(device);
 +  }
 +  /**
 +   * wrete a ts record into the memtable. If the memory usage is beyond the memThreshold, an async
 +   * flushing operation will be called.
 +   *
 +   * @param plan data to be written
 +   * @return - 1 (WRITE_SUCCESS) if the tsRecord can be inserted into tsFile.
 +   * - 0 (WRITE_IN_WARNING_MEM) if the memory is UsageLevel.WARNING
 +   * - -1 (WRITE_REJECT_BY_TIME) if you need to insert it into overflow
 +   * - -2 (WRITE_REJECT_BY_MEM) if the memory is UsageLevel.ERROR
 +   * @throws BufferWriteProcessorException if a flushing operation occurs and failed.
 +   */
 +  public int insert(InsertPlan plan) throws BufferWriteProcessorException, IOException {
 +    if (!canWrite(plan.getDeviceId(), plan.getTime())) {
 +      return WRITE_REJECT_BY_TIME;
 +    }
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      logNode.write(plan);
 +    }
 +    long memUsage = 0;
 +    TSDataType type;
 +    String measurement;
 +    for (int i=0; i < plan.getMeasurements().length; i++){
 +      measurement = plan.getMeasurements()[i];
 +      type = fileSchemaRef.getMeasurementDataType(measurement);
 +      memUsage += MemUtils.getPointSize(type, measurement);
 +    }
 +    UsageLevel level = BasicMemController.getInstance().acquireUsage(this, memUsage);
 +    switch (level) {
 +      case SAFE:
 +        doInsert(plan);
 +        checkMemThreshold4Flush(memUsage);
 +        return WRITE_SUCCESS;
 +      case WARNING:
 +        if(LOGGER.isWarnEnabled()) {
 +          LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",
 +              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
 +        }
 +        doInsert(plan);
 +        try {
 +          flush();
 +        } catch (IOException e) {
 +          throw new BufferWriteProcessorException(e);
 +        }
 +        return WRITE_IN_WARNING_MEM;
 +      case DANGEROUS:
 +        if (LOGGER.isWarnEnabled()) {
 +          LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",
 +              MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage()));
 +        }
 +        return WRITE_REJECT_BY_MEM;
 +      default:
 +        return WRITE_REJECT_BY_MEM;
 +    }
 +  }
 +
 +  private void doInsert(InsertPlan plan) {
 +    String deviceId = plan.getDeviceId();
 +    long time = plan.getTime();
 +    TSDataType type;
 +    String measurement;
 +    for (int i = 0; i < plan.getMeasurements().length; i++) {
 +      measurement = plan.getMeasurements()[i];
 +      type = fileSchemaRef.getMeasurementDataType(measurement);
 +      workMemTable.write(deviceId, measurement, type, time, plan.getValues()[i]);
 +    }
 +    if (!minWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)) {
 +      minWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
 +    }
 +    if (!maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
 +        || maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < time) {
 +      maxWrittenTimeForEachDeviceInCurrentFile.put(deviceId, time);
 +    }
 +    valueCount++;
 +  }
 +
 +  /**
 +   * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
 +   * Delete data in both working MemTable and flushing MemTable.
 +   *
 +   * @param deviceId the deviceId of the timeseries to be deleted.
 +   * @param measurementId the measurementId of the timeseries to be deleted.
 +   * @param timestamp the upper-bound of deletion time.
 +   */
 +  public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
 +    workMemTable.delete(deviceId, measurementId, timestamp);
 +    if (maxWrittenTimeForEachDeviceInCurrentFile.containsKey(deviceId)
 +        && maxWrittenTimeForEachDeviceInCurrentFile.get(deviceId) < timestamp) {
 +      maxWrittenTimeForEachDeviceInCurrentFile
 +          .put(deviceId, lastFlushedTimeForEachDevice.getOrDefault(deviceId, 0L));
 +    }
 +    boolean deleteFlushTable = false;
 +    if (isFlush()) {
 +      // flushing MemTable cannot be directly modified since another thread is reading it
 +      flushMemTable = flushMemTable.copy();
 +      deleteFlushTable = flushMemTable.delete(deviceId, measurementId, timestamp);
 +    }
 +    String fullPath = deviceId +
 +        IoTDBConstant.PATH_SEPARATOR + measurementId;
 +    Deletion deletion = new Deletion(fullPath, versionController.nextVersion(), timestamp);
 +    if (deleteFlushTable || (currentResource.containsDevice(deviceId)
 +        && currentResource.getStartTime(deviceId) <= timestamp)) {
 +      currentResource.getModFile().write(deletion);
 +    }
 +    for (TsFileResource resource : tsFileResources) {
 +      if (resource.containsDevice(deviceId) && resource.getStartTime(deviceId) <= timestamp) {
 +        resource.getModFile().write(deletion);
 +      }
 +    }
 +    if (lastFlushedTimeForEachDevice.containsKey(deviceId)
 +        && lastFlushedTimeForEachDevice.get(deviceId) <= timestamp) {
 +      lastFlushedTimeForEachDevice.put(deviceId, 0L);
 +    }
 +  }
 +
 +
 +  private void checkMemThreshold4Flush(long addedMemory) throws BufferWriteProcessorException {
 +    long newMem = memSize.addAndGet(addedMemory);
 +    if (newMem > TSFileConfig.groupSizeInByte) {
 +      if (LOGGER.isInfoEnabled()) {
 +        String usageMem = MemUtils.bytesCntToStr(newMem);
 +        String threshold = MemUtils.bytesCntToStr(TSFileConfig.groupSizeInByte);
 +        LOGGER.info("The usage of memory {} in bufferwrite processor {} reaches the threshold {}",
 +            usageMem, processorName, threshold);
 +      }
 +      try {
 +        flush();
 +      } catch (IOException e) {
 +        LOGGER.error("Flush bufferwrite error.", e);
 +        throw new BufferWriteProcessorException(e);
 +      }
 +    }
 +  }
 +
 +
 +  /**
 +   * this method is for preparing a task task and then submitting it.
 +   * @return
 +   * @throws IOException
 +   */
 +  @Override
 +  public Future<Boolean> flush() throws IOException {
 +    // waiting for the end of last flush operation.
 +    try {
 +      flushFuture.get();
 +    } catch (InterruptedException | ExecutionException e) {
 +      LOGGER.error(
 +          "Encounter an interrupt error when waitting for the flushing, the TsFile Processor is {}.",
 +          getProcessorName(), e);
 +      Thread.currentThread().interrupt();
 +    }
 +    // statistic information for flush
 +    if (valueCount <= 0) {
 +      LOGGER.debug(
 +          "TsFile Processor {} has zero data to be flushed, will return directly.", processorName);
 +      flushFuture = new ImmediateFuture<>(true);
 +      return flushFuture;
 +    }
 +
 +    if (LOGGER.isInfoEnabled()) {
 +      long thisFlushTime = System.currentTimeMillis();
 +      LOGGER.info(
 +          "The TsFile Processor {}: last flush time is {}, this flush time is {}, "
 +              + "flush time interval is {} s", getProcessorName(),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(fileNamePrefix / 1000),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
 +          (thisFlushTime - fileNamePrefix / 1000) / 1000);
 +    }
 +    fileNamePrefix = System.nanoTime();
 +
 +    // update the lastUpdatetime, prepare for flush
 +    try {
 +      beforeFlushAction.act();
 +    } catch (Exception e) {
 +      LOGGER.error("Failed to flush memtable into tsfile when calling the action function.");
 +      throw new IOException(e);
 +    }
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +      logNode.notifyStartFlush();
 +    }
 +    valueCount = 0;
 +    switchWorkToFlush();
 +    long version = versionController.nextVersion();
 +    BasicMemController.getInstance().releaseUsage(this, memSize.get());
 +    memSize.set(0);
 +    // switch
 +    flushFuture = FlushManager.getInstance().submit(() -> flushTask(version));
 +    return flushFuture;
 +  }
 +
 +  /**
 +   * this method will be concurrent with other methods..
 +   *
 +   * @param version the operation version that will tagged on the to be flushed memtable (i.e.,
 +   * ChunkGroup)
 +   * @return true if successfully.
 +   */
 +  private boolean flushTask(long version) {
 +    boolean result;
 +    long flushStartTime = System.currentTimeMillis();
 +    LOGGER.info("The TsFile Processor {} starts flushing.", processorName);
 +    try {
 +      if (flushMemTable != null && !flushMemTable.isEmpty()) {
 +        // flush data
 +        MemTableFlushUtil.flushMemTable(fileSchemaRef, writer, flushMemTable,
 +            version);
 +        // write restore information
 +        writer.flush();
 +      } else {
 +        //no need to flush.
 +        return true;
 +      }
 +
 +      afterFlushAction.act();
 +      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
 +        logNode.notifyEndFlush(null);
 +      }
 +      result = true;
 +    } catch (Exception e) {
 +      LOGGER.error(
 +          "The TsFile Processor {} failed to flush, when calling the afterFlushAction(filenodeFlushAction).",
 +          processorName, e);
 +      result = false;
 +    } finally {
 +      try {
 +        switchFlushToWork();
 +      } catch (BufferWriteProcessorException e) {
 +        LOGGER.error(e.getMessage());
 +        result = false;
 +      }
 +      LOGGER.info("The TsFile Processor {} ends flushing.", processorName);
 +    }
 +    if (LOGGER.isInfoEnabled()) {
 +      long flushEndTime = System.currentTimeMillis();
 +      LOGGER.info(
 +          "The TsFile Processor {} flush, start time is {}, flush end time is {}, "
 +              + "flush time consumption is {}ms",
 +          processorName,
 +          DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
 +          DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
 +          flushEndTime - flushStartTime);
 +    }
 +    return result;
 +  }
 +
 +  private void switchWorkToFlush() {
 +    flushQueryLock.lock();
 +    try {
 +      if (flushMemTable == null) {
 +        flushMemTable = workMemTable;
 +        for (String device : flushMemTable.getMemTableMap().keySet()) {
 +          lastFlushedTimeForEachDevice.put(device, maxWrittenTimeForEachDeviceInCurrentFile.get(device));
 +        }
 +        workMemTable = new PrimitiveMemTable();
 +      }
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +  private void switchFlushToWork() throws BufferWriteProcessorException {
 +    flushQueryLock.lock();
 +    try {
 +      //we update the index of currentTsResource.
 +      for (String device : flushMemTable.getMemTableMap().keySet()) {
 +        currentResource.setStartTime(device, minWrittenTimeForEachDeviceInCurrentFile.get(device));
 +        // new end time must be larger than the old one.
 +        currentResource.setEndTime(device, lastFlushedTimeForEachDevice.get(device));
 +      }
 +      flushMemTable.clear();
 +      flushMemTable = null;
 +      //make chunk groups in this flush task visble
 +      writer.appendMetadata();
 +      if (needCloseCurrentFile()) {
 +        closeCurrentTsFileAndOpenNewOne();
 +      }
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +  @Override
 +  public boolean canBeClosed() {
 +    return true;
 +  }
 +
 +
 +  /**
 +   * this method do not call flush() to flush data in memory to disk.
 +   * @throws BufferWriteProcessorException
 +   */
 +  private void closeCurrentTsFileAndOpenNewOne() throws BufferWriteProcessorException {
 +    closeCurrentFile();
 +    initCurrentTsFile(generateNewTsFilePath());
 +  }
 +
 +  //very dangerous, how to make sure this function is thread safe (no other functions are running)
 +  private void closeCurrentFile() throws BufferWriteProcessorException {
 +    try {
 +      long closeStartTime = System.currentTimeMillis();
 +      // end file
 +      if (writer.getChunkGroupMetaDatas().isEmpty()){
 +        //this is an empty TsFile, we do not need to save it...
 +        writer.endFile(fileSchemaRef);
 +        Files.delete(Paths.get(insertFile.getAbsolutePath()));
 +      } else {
 +        writer.endFile(fileSchemaRef);
 +        // update the IntervalFile for interval list
 +        afterCloseAction.act();
 +        // flush the changed information for filenode
 +        afterFlushAction.act();
 +
 +        tsFileResources.add(currentResource);
 +        //maintain the inverse index
 +        for (String device : currentResource.getDevices()) {
 +          inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>())
 +              .add(currentResource);
 +        }
 +      }
 +
 +      // delete the restore for this bufferwrite processor
 +      if (LOGGER.isInfoEnabled()) {
 +        long closeEndTime = System.currentTimeMillis();
 +        LOGGER.info(
 +            "Close current TsFile {}, start time is {}, end time is {}, time consumption is {}ms",
 +            insertFile.getAbsolutePath(),
 +            DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
 +            DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
 +            closeEndTime - closeStartTime);
 +      }
 +
 +      workMemTable.clear();
 +      if (flushMemTable != null) {
 +        flushMemTable.clear();
 +      }
 +      if (logNode != null) {
 +        logNode.close();
 +      }
 +
 +    } catch (IOException e) {
 +      LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
 +          getProcessorName(), e);
 +      throw new BufferWriteProcessorException(e);
 +    } catch (Exception e) {
 +      LOGGER
 +          .error("Failed to close the bufferwrite processor when calling the action function.", e);
 +      throw new BufferWriteProcessorException(e);
 +    }
 +  }
 +
 +  @Override
 +  public long memoryUsage() {
 +    return 0;
 +  }
 +
 +  /**
 +   * check if is flushing.
 +   *
 +   * @return True if flushing
 +   */
 +  public boolean isFlush() {
 +    // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
 +    // So, the following case exists: flushMemtable != null but flushFuture is done (because the
 +    // flushFuture refers to the last finished flush.
 +    // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
 +    // (flushTask() is not finished, but switchToWork() has done)
 +    // So, checking flushMemTable is more meaningful than flushFuture.isDone().
 +    return flushMemTable != null;
 +  }
 +
 +
 +  /**
 +   * query data.
 +   */
 +  public GlobalSortedSeriesDataSource query(SingleSeriesExpression expression,
 +      QueryContext context) throws IOException {
 +    MeasurementSchema mSchema;
 +    TSDataType dataType;
 +
 +    String deviceId = expression.getSeriesPath().getDevice();
 +    String measurementId = expression.getSeriesPath().getMeasurement();
 +
 +    mSchema = fileSchemaRef.getMeasurementSchema(measurementId);
 +    dataType = mSchema.getType();
 +
 +    // tsfile dataØØ
 +    List<TsFileResource> dataFiles = new ArrayList<>();
 +    for (TsFileResource tsfile : tsFileResources) {
 +      //TODO in the old version, tsfile is deep copied. I do not know why
 +      dataFiles.add(tsfile);
 +    }
 +    // bufferwrite data
 +    //TODO unsealedTsFile class is a little redundant.
 +    UnsealedTsFile unsealedTsFile = null;
 +
 +    if (currentResource.getStartTime(deviceId) >= 0) {
 +      unsealedTsFile = new UnsealedTsFile();
 +      unsealedTsFile.setFilePath(currentResource.getFile().getAbsolutePath());
 +      List<ChunkMetaData> chunks = writer.getMetadatas(deviceId, measurementId, dataType);
 +      List<Modification> pathModifications = context.getPathModifications(
 +          currentResource.getModFile(), deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
 +      if (!pathModifications.isEmpty()) {
 +        QueryUtils.modifyChunkMetaData(chunks, pathModifications);
 +      }
 +
 +      unsealedTsFile.setTimeSeriesChunkMetaDatas(chunks);
 +    }
 +    return new GlobalSortedSeriesDataSource(
 +        new Path(deviceId + "." + measurementId), dataFiles, unsealedTsFile,
 +        queryDataInMemtable(deviceId, measurementId, dataType, mSchema.getProps()));
 +  }
 +
 +  /**
 +   * get the one (or two) chunk(s) in the memtable ( and the other one in flushing status and then
 +   * compact them into one TimeValuePairSorter). Then get its (or their) ChunkMetadata(s).
 +   *
 +   * @param deviceId device id
 +   * @param measurementId sensor id
 +   * @param dataType data type
 +   * @return corresponding chunk data in memory
 +   */
 +  private ReadOnlyMemChunk queryDataInMemtable(String deviceId,
 +      String measurementId, TSDataType dataType, Map<String, String> props) {
 +    flushQueryLock.lock();
 +    try {
 +      MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
 +      if (flushMemTable != null) {
 +        memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
 +      }
 +      memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
 +      // memSeriesLazyMerger has handled the props,
 +      // so we do not need to handle it again in the following readOnlyMemChunk
 +      return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap());
 +    } finally {
 +      flushQueryLock.unlock();
 +    }
 +  }
 +
 +
 +  public String getInsertFilePath() {
 +    return insertFile.getAbsolutePath();
 +  }
 +
 +  public WriteLogNode getLogNode() {
 +    return logNode;
 +  }
 +
 +  /**
 +   * used for test. We can know when the flush() is called.
 +   *
 +   * @return the last flush() time. Time unit: nanosecond.
 +   */
 +  public long getFileNamePrefix() {
 +    return fileNamePrefix;
 +  }
 +
 +  /**
 +   * used for test. We can block to wait for finishing flushing.
 +   *
 +   * @return the future of the flush() task.
 +   */
 +  public Future<Boolean> getFlushFuture() {
 +    return flushFuture;
 +  }
 +
 +  @Override
 +  public void close() throws BufferWriteProcessorException {
 +    closeCurrentFile();
 +    try {
 +      if (currentResource != null) {
 +        currentResource.close();
 +      }
 +      for (TsFileResource resource : tsFileResources) {
 +        resource.close();
 +      }
 +
 +    } catch (IOException e) {
 +      throw new BufferWriteProcessorException(e);
 +    }
 +  }
 +
 +  /**
 +   * remove all data of this processor. Used For UT
 +   */
 +  public void removeMe() throws BufferWriteProcessorException, IOException {
 +    try {
 +      flushFuture.get(10000, TimeUnit.MILLISECONDS);
 +    } catch (ExecutionException | TimeoutException e) {
 +      LOGGER.error("can not end running flush task in 10 seconds: {}", e.getMessage());
 +    } catch (InterruptedException e) {
 +      LOGGER.error("current running flush task is interrupted.", e);
 +      Thread.currentThread().interrupt();
 +    }
 +    close();
 +    for (String folder : Directories.getInstance().getAllTsFileFolders()) {
 +      File dataFolder = new File(folder, processorName);
 +      if (dataFolder.exists()) {
 +        for (File file: dataFolder.listFiles()) {
 +          Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Check if the currentTsFileResource is toooo large.
 +   * @return  true if the file is too large.
 +   */
 +  private boolean needCloseCurrentFile() {
 +    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 +    long fileSize = currentResource.getFile().length();
 +    if (fileSize >= config.getBufferwriteFileSizeThreshold()) {
 +      if (LOGGER.isInfoEnabled()) {
 +        LOGGER.info(
 +            "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}.",
 +            processorName, MemUtils.bytesCntToStr(fileSize), currentResource.getFilePath(),
 +            MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
 +      }
 +      return true;
 +    }
 +    return false;
 +  }
 +
 +  protected List<String> getAllDataFolders() {
 +    return Directories.getInstance().getAllTsFileFolders();
 +  }
 +
 +  protected String getNextDataFolder() {
 +    return Directories.getInstance().getNextFolderForTsfile();
 +  }
 +
 +  @Override
 +  public boolean equals(Object o) {
 +    if (this == o) {
 +      return true;
 +    }
 +    if (!(o instanceof TsFileProcessor)) {
 +      return false;
 +    }
 +    if (!super.equals(o)) {
 +      return false;
 +    }
-     TsFileProcessor that = (TsFileProcessor) o;
-     return fileNamePrefix == that.fileNamePrefix &&
-         valueCount == that.valueCount &&
-         Objects.equals(fileSchemaRef, that.fileSchemaRef) &&
-         Objects.equals(flushFuture, that.flushFuture) &&
-         Objects.equals(flushQueryLock, that.flushQueryLock) &&
-         Objects.equals(memSize, that.memSize) &&
-         Objects.equals(workMemTable, that.workMemTable) &&
-         Objects.equals(flushMemTable, that.flushMemTable) &&
-         Objects.equals(writer, that.writer) &&
-         Objects.equals(beforeFlushAction, that.beforeFlushAction) &&
-         Objects.equals(afterCloseAction, that.afterCloseAction) &&
-         Objects.equals(afterFlushAction, that.afterFlushAction) &&
-         Objects.equals(insertFile, that.insertFile) &&
-         Objects.equals(currentResource, that.currentResource) &&
-         Objects.equals(tsFileResources, that.tsFileResources) &&
-         Objects.equals(inverseIndexOfResource, that.inverseIndexOfResource) &&
-         Objects.equals(lastFlushedTimeForEachDevice, that.lastFlushedTimeForEachDevice) &&
-         Objects.equals(logNode, that.logNode) &&
-         Objects.equals(versionController, that.versionController);
++    return this == o;
 +  }
 +
 +  @Override
 +  public int hashCode() {
-     return Objects
-         .hash(super.hashCode(), fileSchemaRef, flushFuture, flushQueryLock, memSize, fileNamePrefix,
-             valueCount, workMemTable, flushMemTable, writer, beforeFlushAction, afterCloseAction,
-             afterFlushAction, insertFile, currentResource, tsFileResources, inverseIndexOfResource,
-             lastFlushedTimeForEachDevice, logNode, versionController);
++    return super.hashCode();
 +  }
 +}