You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 08:33:17 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (8a0c12f -> f28b671)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 8a0c12f  add javadocs
     new 2d19bdb  remove FileNodeRestore file
     new e89b1b6  add filenodeManager
     new f28b671  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/engine/UnsealedTsFileProcessorV2.java |  12 +-
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  59 ++--
 .../db/engine/filenode/FileNodeProcessor.java      |   2 +-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 323 ++++++++++++++++++++-
 .../filenodeV2/FileNodeProcessorStoreV2.java       | 173 -----------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 212 +++++++-------
 .../apache/iotdb/db/engine/memtable/Callback.java  |  25 --
 .../db/engine/memtable/MemTableFlushTaskV2.java    |   6 +-
 8 files changed, 466 insertions(+), 346 deletions(-)
 delete mode 100644 iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
 delete mode 100644 iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java


[incubator-iotdb] 02/03: add filenodeManager

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e89b1b64d7cdf8279f32497c56e3989795624c84
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 16:32:43 2019 +0800

    add filenodeManager
---
 .../iotdb/db/engine/UnsealedTsFileProcessorV2.java |   2 +-
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  59 ++--
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 323 ++++++++++++++++++++-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 121 ++++++--
 4 files changed, 443 insertions(+), 62 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index 7111873..5249560 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -162,7 +162,7 @@ public class UnsealedTsFileProcessorV2 {
    */
   public void flushOneMemTable() throws IOException {
     IMemTable memTableToFlush = flushingMemTables.pollFirst();
-    // null memtable only appears when calling forceClose()
+    // null memtable only appears when calling asyncForceClose()
     if (memTableToFlush != null) {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
           this::releaseFlushedMemTable);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 5dfa089..9e42101 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -930,34 +930,6 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   /**
-   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
-   */
-  private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
-    if (!processorMap.containsKey(processorName)) {
-      return true;
-    }
-
-    Processor processor = processorMap.get(processorName);
-    if (processor.tryWriteLock()) {
-      try {
-        if (processor.canBeClosed()) {
-          processor.close();
-          return true;
-        } else {
-          return false;
-        }
-      } catch (ProcessorException e) {
-        LOGGER.error("Close the filenode processor {} error.", processorName, e);
-        throw new FileNodeManagerException(e);
-      } finally {
-        processor.writeUnlock();
-      }
-    } else {
-      return false;
-    }
-  }
-
-  /**
    * delete one filenode.
    */
   public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
@@ -1096,8 +1068,39 @@ public class FileNodeManager implements IStatistic, IService {
     }
   }
 
+
+  /**
+   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+   * notice: this method has the same function with close()
+   */
+  private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
+    if (!processorMap.containsKey(processorName)) {
+      return true;
+    }
+
+    Processor processor = processorMap.get(processorName);
+    if (processor.tryWriteLock()) {
+      try {
+        if (processor.canBeClosed()) {
+          processor.close();
+          return true;
+        } else {
+          return false;
+        }
+      } catch (ProcessorException e) {
+        LOGGER.error("Close the filenode processor {} error.", processorName, e);
+        throw new FileNodeManagerException(e);
+      } finally {
+        processor.writeUnlock();
+      }
+    } else {
+      return false;
+    }
+  }
+
   /**
    * try to setCloseMark the filenode processor.
+   * notice: This method has the same function with closeOneProcessor()
    */
   private void close(String processorName) throws FileNodeManagerException {
     if (!processorMap.containsKey(processorName)) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 3d0ad48..d5938dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -19,23 +19,45 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileNodeManagerV2 implements IService {
+public class FileNodeManagerV2 implements IStatistic, IService {
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class);
@@ -52,7 +74,7 @@ public class FileNodeManagerV2 implements IService {
    * This map is used to manage all filenode processor,<br> the key is filenode name which is
    * storage group seriesPath.
    */
-  private ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
+  private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
 
   private static final FileNodeManagerV2 INSTANCE = new FileNodeManagerV2();
 
@@ -65,6 +87,11 @@ public class FileNodeManagerV2 implements IService {
    */
   private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
 
+  /**
+   * There is no need to add concurrently
+   **/
+  private HashMap<String, AtomicLong> statParamsHashMap;
+
   private enum FileNodeManagerStatus {
     NONE, MERGE, CLOSE
   }
@@ -93,7 +120,11 @@ public class FileNodeManagerV2 implements IService {
 
   @Override
   public void stop() {
-
+    try {
+      syncCloseAllProcessor();
+    } catch (FileNodeManagerException e) {
+      LOGGER.error("Failed to setCloseMark file node manager because .", e);
+    }
   }
 
   @Override
@@ -103,7 +134,7 @@ public class FileNodeManagerV2 implements IService {
 
 
   private FileNodeProcessorV2 getProcessor(String devicePath)
-      throws FileNodeManagerException, FileNodeProcessorException {
+      throws FileNodeManagerException {
     String filenodeName;
     try {
       // return the storage group name
@@ -121,8 +152,14 @@ public class FileNodeManagerV2 implements IService {
         if (processor == null) {
           LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
               filenodeName, Thread.currentThread().getId());
-          processor = new FileNodeProcessorV2(baseDir, filenodeName);
-          processorMap.put(filenodeName, processor);
+          try {
+            processor = new FileNodeProcessorV2(baseDir, filenodeName);
+          } catch (FileNodeProcessorException e) {
+            throw new FileNodeManagerException(e);
+          }
+          synchronized (processorMap) {
+            processorMap.put(filenodeName, processor);
+          }
         }
       }
     }
@@ -130,13 +167,92 @@ public class FileNodeManagerV2 implements IService {
   }
 
 
+  private void updateStatHashMapWhenFail(TSRecord tsRecord) {
+    statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
+        .incrementAndGet();
+    statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
+        .addAndGet(tsRecord.dataPointList.size());
+  }
+
+  /**
+   * get stats parameter hash map.
+   *
+   * @return the key represents the params' name, values is AtomicLong type
+   */
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    return statParamsHashMap;
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + statConstant.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
+    return ret;
+  }
+
+  /**
+   * Init Stat MetaDta.
+   */
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      hashMap
+          .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + statConstant.name(), MonitorConstants.DATA_TYPE_INT64);
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  private void updateStat(boolean isMonitor, TSRecord tsRecord) {
+    if (!isMonitor) {
+      statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
+          .addAndGet(tsRecord.dataPointList.size());
+    }
+  }
+
+  /**
+   * This function is just for unit test.
+   */
+  public synchronized void resetFileNodeManager() {
+    for (String key : statParamsHashMap.keySet()) {
+      statParamsHashMap.put(key, new AtomicLong());
+    }
+    processorMap.clear();
+  }
+
+
+
   /**
    * insert TsRecord into storage group.
    *
    * @param tsRecord input Data
+   * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
+   * be recorded. if false, the statParamsHashMap will be updated.
    * @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite
    */
-  public boolean insert(TSRecord tsRecord) {
+  public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
+
+    checkTimestamp(tsRecord);
+
+    updateStat(isMonitor, tsRecord);
 
     FileNodeProcessorV2 fileNodeProcessor;
     try {
@@ -144,9 +260,10 @@ public class FileNodeManagerV2 implements IService {
     } catch (Exception e) {
       LOGGER.warn("get FileNodeProcessor of device {} failed, because {}", tsRecord.deviceId,
           e.getMessage(), e);
-      return false;
+      throw new FileNodeManagerException(e);
     }
 
+    // TODO monitor: update statistics
     return fileNodeProcessor.insert(tsRecord);
   }
 
@@ -169,4 +286,194 @@ public class FileNodeManagerV2 implements IService {
     }
   }
 
+  private void checkTimestamp(TSRecord tsRecord) throws FileNodeManagerException {
+    if (tsRecord.time < 0) {
+      LOGGER.error("The insert time lt 0, {}.", tsRecord);
+      throw new FileNodeManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
+    }
+  }
+
+  /**
+   * recovery the filenode processor.
+   */
+  public void recovery() {
+    // TODO
+  }
+
+
+  private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+
+  /**
+   * update data.
+   */
+  public void update(String deviceId, String measurementId, long startTime, long endTime,
+      TSDataType type, String v) {
+    // TODO
+  }
+
+  /**
+   * delete data.
+   */
+  public void delete(String deviceId, String measurementId, long timestamp)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+  private void delete(String processorName,
+      Iterator<Entry<String, FileNodeProcessor>> processorIterator)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+
+  /**
+   * begin query.
+   *
+   * @param deviceId queried deviceId
+   * @return a query token for the device.
+   */
+  public int beginQuery(String deviceId) throws FileNodeManagerException {
+    // TODO
+    return -1;
+  }
+
+  /**
+   * end query.
+   */
+  public void endQuery(String deviceId, int token) throws FileNodeManagerException {
+    // TODO
+  }
+
+  /**
+   * query data.
+   */
+  public QueryDataSourceV2 query(SingleSeriesExpression seriesExpression, QueryContext context)
+      throws FileNodeManagerException {
+    String deviceId = seriesExpression.getSeriesPath().getDevice();
+    String measurementId = seriesExpression.getSeriesPath().getMeasurement();
+    FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId);
+    LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
+        fileNodeProcessor.getStorageGroupName());
+    return fileNodeProcessor.query(deviceId, measurementId);
+  }
+
+  /**
+   * Append one specified tsfile to the storage group. <b>This method is only provided for
+   * transmission module</b>
+   *
+   * @param fileNodeName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile,
+      String appendFilePath) throws FileNodeManagerException {
+    // TODO
+    return true;
+  }
+
+  /**
+   * get all overlap tsfiles which are conflict with the appendFile.
+   *
+   * @param fileNodeName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile,
+      String uuid) throws FileNodeManagerException {
+    // TODO
+    return null;
+  }
+
+
+  /**
+   * merge all overflowed filenode.
+   *
+   * @throws FileNodeManagerException FileNodeManagerException
+   */
+  public void mergeAll() throws FileNodeManagerException {
+    // TODO
+  }
+
+  /**
+   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+   */
+  private boolean tryToCloseFileNodeProcessor(String processorName) throws FileNodeManagerException {
+    // TODO
+    return false;
+  }
+
+  /**
+   * Force to setCloseMark the filenode processor.
+   */
+  public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
+    if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
+      return;
+    }
+
+    fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
+    try {
+      if (processorMap.containsKey(processorName)) {
+        deleteFileNodeBlocked(processorName);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Delete the filenode processor {} error.", processorName, e);
+      throw new FileNodeManagerException(e);
+    } finally {
+      fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+    }
+  }
+
+  private void deleteFileNodeBlocked(String processorName) throws IOException {
+    LOGGER.info("Forced to delete the filenode processor {}", processorName);
+    FileNodeProcessorV2 processor = processorMap.get(processorName);
+    processor.syncCloseFileNode(() -> {
+      String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
+      fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
+      try {
+        FileUtils.deleteDirectory(new File(fileNodePath));
+      } catch (IOException e) {
+        LOGGER.error("Delete tsfiles failed", e);
+      }
+      synchronized (processorMap) {
+        processorMap.remove(processorName);
+      }
+      return true;
+    });
+  }
+
+
+  /**
+   * add time series.
+   */
+  public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props) throws FileNodeManagerException {
+    FileNodeProcessorV2 fileNodeProcessor = getProcessor(path.getFullPath());
+    fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props);
+  }
+
+
+  /**
+   * delete all filenode.
+   */
+  public synchronized boolean deleteAll() throws FileNodeManagerException {
+    LOGGER.info("Start deleting all filenode");
+    // TODO
+    return true;
+  }
+
+  /**
+   * Sync asyncCloseOneProcessor all file node processors.
+   */
+  public void syncCloseAllProcessor() throws FileNodeManagerException {
+    LOGGER.info("Start closing all filenode processor");
+    synchronized (processorMap){
+      for(FileNodeProcessorV2 processor: processorMap.values()){
+        processor.asyncForceClose();
+      }
+    }
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 6dedfa8..ce8fbba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -23,12 +23,13 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
@@ -66,12 +67,12 @@ public class FileNodeProcessorV2 {
 
   // includes sealed and unsealed sequnce tsfiles
   private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
-  private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
+  private UnsealedTsFileProcessorV2 workSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   // includes sealed and unsealed unsequnce tsfiles
   private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
-  private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
+  private UnsealedTsFileProcessorV2 workUnSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   /**
@@ -88,12 +89,20 @@ public class FileNodeProcessorV2 {
 
   private final ReadWriteLock lock;
 
+  private Condition closeFileNodeCondition;
+
+  /**
+   * Mark whether to close file node
+   */
+  private volatile boolean toBeClosed;
+
   private VersionController versionController;
 
   public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
       throws FileNodeProcessorException {
     this.storageGroupName = storageGroupName;
     lock = new ReentrantReadWriteLock();
+    closeFileNodeCondition = lock.writeLock().newCondition();
 
     File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
     if (!storageGroupDir.exists()) {
@@ -160,29 +169,40 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  public boolean insert(TSRecord tsRecord) {
+  /**
+   *
+   * @param tsRecord
+   * @return -1: failed, 1: Overflow, 2:Bufferwrite
+   */
+  public int insert(TSRecord tsRecord) {
     lock.writeLock().lock();
-    boolean result;
+    int insertResult;
 
     try {
+      if(toBeClosed){
+        return -1;
+      }
       // init map
       latestTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
       latestFlushedTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 
+      boolean result;
       // write to sequence or unsequence file
       if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
-        result = writeUnsealedDataFile(workUnsealedSequenceTsFileProcessor, tsRecord, true);
+        result = writeUnsealedDataFile(workSequenceTsFileProcessor, tsRecord, true);
+        insertResult = result ? 1 : -1;
       } else {
-        result = writeUnsealedDataFile(workUnsealedUnSequenceTsFileProcessor, tsRecord, false);
+        result = writeUnsealedDataFile(workUnSequenceTsFileProcessor, tsRecord, false);
+        insertResult = result ? 2 : -1;
       }
     } catch (Exception e) {
       LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
-      result = false;
+      insertResult = -1;
     } finally {
       lock.writeLock().unlock();
     }
 
-    return result;
+    return insertResult;
   }
 
   private boolean writeUnsealedDataFile(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
@@ -282,10 +302,10 @@ public class FileNodeProcessorV2 {
     if (unsealedTsFileProcessor.shouldClose()) {
       if (sequence) {
         closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workUnsealedSequenceTsFileProcessor = null;
+        workSequenceTsFileProcessor = null;
       } else {
         closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workUnsealedUnSequenceTsFileProcessor = null;
+        workUnSequenceTsFileProcessor = null;
       }
       unsealedTsFileProcessor.setCloseMark();
     }
@@ -304,29 +324,80 @@ public class FileNodeProcessorV2 {
    */
   // TODO please consider concurrency with query and write method.
   private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
-    closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
-    // end time with one start time
-    Map<String, Long> endTimeMap = new HashMap<>();
-    TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
-    synchronized (resource) {
-      for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-        String deviceId = startTime.getKey();
-        endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+    lock.writeLock().unlock();
+    try {
+      closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
+      // end time with one start time
+      Map<String, Long> endTimeMap = new HashMap<>();
+      TsFileResourceV2 resource = workSequenceTsFileProcessor.getTsFileResource();
+      synchronized (resource) {
+        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+          String deviceId = startTime.getKey();
+          endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+        }
+        resource.setEndTimeMap(endTimeMap);
       }
-      resource.setEndTimeMap(endTimeMap);
+      closeFileNodeCondition.signal();
+    }finally {
+      lock.writeLock().unlock();
     }
   }
 
-  public void forceClose() {
+  public void asyncForceClose() {
     lock.writeLock().lock();
     try {
-      if (workUnsealedSequenceTsFileProcessor != null) {
-        closingSequenceTsFileProcessor.add(workUnsealedSequenceTsFileProcessor);
-        workUnsealedSequenceTsFileProcessor.forceClose();
-        workUnsealedSequenceTsFileProcessor = null;
+      if (workSequenceTsFileProcessor != null) {
+        closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
+        workSequenceTsFileProcessor.forceClose();
+        workSequenceTsFileProcessor = null;
+      }
+      if (workUnSequenceTsFileProcessor != null) {
+        closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
+        workUnSequenceTsFileProcessor.forceClose();
+        workUnSequenceTsFileProcessor = null;
       }
     } finally {
       lock.writeLock().unlock();
     }
   }
+
+  /**
+   * Block this method until this file node can be closed.
+   */
+  public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManager){
+    lock.writeLock().lock();
+    try {
+      asyncForceClose();
+      toBeClosed = true;
+      while (true) {
+        if (unSequenceFileList.isEmpty() && sequenceFileList.isEmpty()
+            && workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
+          removeProcessorFromManager.get();
+          break;
+        }
+        closeFileNodeCondition.await();
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error("CloseFileNodeConditon occurs error while waiting for closing the file node {}",
+          storageGroupName, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
+    return workSequenceTsFileProcessor;
+  }
+
+  public UnsealedTsFileProcessorV2 getWorkUnSequenceTsFileProcessor() {
+    return workUnSequenceTsFileProcessor;
+  }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  public int getClosingProcessorSize(){
+    return unSequenceFileList.size() + sequenceFileList.size();
+  }
 }


[incubator-iotdb] 01/03: remove FileNodeRestore file

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2d19bdbec97e2f02f324806b2a54574b9a603315
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 13:09:47 2019 +0800

    remove FileNodeRestore file
---
 .../iotdb/db/engine/UnsealedTsFileProcessorV2.java |  10 +-
 .../db/engine/filenode/FileNodeProcessor.java      |   2 +-
 .../filenodeV2/FileNodeProcessorStoreV2.java       | 173 ---------------------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 119 ++++----------
 .../apache/iotdb/db/engine/memtable/Callback.java  |  25 ---
 .../db/engine/memtable/MemTableFlushTaskV2.java    |   6 +-
 6 files changed, 37 insertions(+), 298 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index a089d0d..7111873 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -26,10 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.bufferwriteV2.FlushManager;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.memtable.Callback;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
@@ -73,7 +74,7 @@ public class UnsealedTsFileProcessorV2 {
 
   protected VersionController versionController;
 
-  private Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
+  private Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
 
   /**
    * sync this object in query() and asyncFlush()
@@ -81,7 +82,8 @@ public class UnsealedTsFileProcessorV2 {
   private final LinkedList<IMemTable> flushingMemTables = new LinkedList<>();
 
   public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
-      VersionController versionController, Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
+      VersionController versionController,
+      Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
       throws IOException {
     this.storageGroupName = storageGroupName;
     this.fileSchema = fileSchema;
@@ -182,7 +184,7 @@ public class UnsealedTsFileProcessorV2 {
     writer = null;
 
     // remove this processor from Closing list in FileNodeProcessor
-    closeUnsealedTsFileProcessor.call(this);
+    closeUnsealedTsFileProcessor.accept(this);
 
     // delete the restore for this bufferwrite processor
     if (LOGGER.isInfoEnabled()) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 082063a..7bc28c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -202,7 +202,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 //    @Override
 //    public void act() {
 //      synchronized (fileNodeProcessorStore) {
-//        fileNodeProcessorStore.setLatestTimeMap(lastUpdateTimeMap);
+//        fileNodeProcessorStore.setLatestFlushTimeForEachDevice(lastUpdateTimeMap);
 //        addLastTimeToIntervalFile();
 //        fileNodeProcessorStore.setSequenceFileList(newFileNodes);
 //      }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
deleted file mode 100644
index d16006d..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite setCloseMark.
- * emptyTsFileResource and sequenceFileList are changed and stored by Overflow flushMetadata and
- * Overflow setCloseMark. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
- * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed and stored when
- * FileNodeProcessor's status changes from work to merge.
- */
-public class FileNodeProcessorStoreV2 implements Serializable {
-
-  private static final long serialVersionUID = -54525372941897565L;
-
-  private boolean isOverflowed;
-  private Map<String, Long> latestTimeMap;
-  private List<TsFileResourceV2> sequenceFileList;
-  private List<TsFileResourceV2> unSequenceFileList;
-  private int numOfMergeFile;
-  private FileNodeProcessorStatus fileNodeProcessorStatus;
-
-  /**
-   * Constructor of FileNodeProcessorStore.
-   *
-   * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
-   * @param latestTimeMap the timestamp of last data point of each device in this FileNode.
-   * @param sequenceFileList sequnce tsfiles in the FileNode.
-   * @param unSequenceFileList unsequnce tsfiles in the FileNode
-   * @param fileNodeProcessorStatus the status of the FileNode.
-   * @param numOfMergeFile the number of files already merged in one merge operation.
-   */
-  public FileNodeProcessorStoreV2(boolean isOverflowed, Map<String, Long> latestTimeMap,
-      List<TsFileResourceV2> sequenceFileList, List<TsFileResourceV2> unSequenceFileList,
-      FileNodeProcessorStatus fileNodeProcessorStatus,
-      int numOfMergeFile) {
-    this.isOverflowed = isOverflowed;
-    this.latestTimeMap = latestTimeMap;
-    this.sequenceFileList = sequenceFileList;
-    this.unSequenceFileList = unSequenceFileList;
-    this.fileNodeProcessorStatus = fileNodeProcessorStatus;
-    this.numOfMergeFile = numOfMergeFile;
-  }
-
-  public void serialize(OutputStream outputStream) throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
-    // latestTimeMap
-    ReadWriteIOUtils.write(latestTimeMap.size(), byteArrayOutputStream);
-    for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
-      ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
-      ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.sequenceFileList.size(), byteArrayOutputStream);
-    for (TsFileResourceV2 tsFileResource : this.sequenceFileList) {
-      tsFileResource.serialize(byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.unSequenceFileList.size(), byteArrayOutputStream);
-    for (TsFileResourceV2 tsFileResource : this.unSequenceFileList) {
-      tsFileResource.serialize(byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
-    ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(), byteArrayOutputStream);
-    // buffer array to outputstream
-    byteArrayOutputStream.writeTo(outputStream);
-  }
-
-  public static FileNodeProcessorStoreV2 deSerialize(InputStream inputStream) throws IOException {
-    boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
-    Map<String, Long> lastUpdateTimeMap = new HashMap<>();
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < size; i++) {
-      String path = ReadWriteIOUtils.readString(inputStream);
-      long time = ReadWriteIOUtils.readLong(inputStream);
-      lastUpdateTimeMap.put(path, time);
-    }
-    size = ReadWriteIOUtils.readInt(inputStream);
-    List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      sequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
-    }
-    size = ReadWriteIOUtils.readInt(inputStream);
-    List<TsFileResourceV2> unsequenceFileList = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      unsequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
-    }
-    int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
-    FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
-        .deserialize(ReadWriteIOUtils.readShort(inputStream));
-
-    return new FileNodeProcessorStoreV2(isOverflowed, lastUpdateTimeMap,
-        sequenceFileList, unsequenceFileList, fileNodeProcessorStatus, numOfMergeFile);
-  }
-
-  public boolean isOverflowed() {
-    return isOverflowed;
-  }
-
-  public void setOverflowed(boolean isOverflowed) {
-    this.isOverflowed = isOverflowed;
-  }
-
-  public FileNodeProcessorStatus getFileNodeProcessorStatus() {
-    return fileNodeProcessorStatus;
-  }
-
-  public void setFileNodeProcessorStatus(FileNodeProcessorStatus fileNodeProcessorStatus) {
-    this.fileNodeProcessorStatus = fileNodeProcessorStatus;
-  }
-
-  public Map<String, Long> getLatestTimeMap() {
-    return new HashMap<>(latestTimeMap);
-  }
-
-  public void setLatestTimeMap(Map<String, Long> latestTimeMap) {
-    this.latestTimeMap = latestTimeMap;
-  }
-
-  public List<TsFileResourceV2> getSequenceFileList() {
-    return sequenceFileList;
-  }
-
-  public void setSequenceFileList(List<TsFileResourceV2> sequenceFileList) {
-    this.sequenceFileList = sequenceFileList;
-  }
-
-  public List<TsFileResourceV2> getUnSequenceFileList() {
-    return unSequenceFileList;
-  }
-
-  public int getNumOfMergeFile() {
-    return numOfMergeFile;
-  }
-
-  public void setNumOfMergeFile(int numOfMergeFile) {
-    this.numOfMergeFile = numOfMergeFile;
-  }
-
-  public void setUnSequenceFileList(
-      List<TsFileResourceV2> unSequenceFileList) {
-    this.unSequenceFileList = unSequenceFileList;
-  }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index fee51dc..6dedfa8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -19,12 +19,11 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -34,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
 import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -67,19 +65,19 @@ public class FileNodeProcessorV2 {
   private FileSchema fileSchema;
 
   // includes sealed and unsealed sequnce tsfiles
-  private List<TsFileResourceV2> sequenceFileList;
+  private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   // includes sealed and unsealed unsequnce tsfiles
-  private List<TsFileResourceV2> unSequenceFileList;
+  private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   /**
    * device -> global latest timestamp of each device
    */
-  private Map<String, Long> latestTimeForEachDevice;
+  private Map<String, Long> latestTimeForEachDevice = new HashMap<>();
 
   /**
    * device -> largest timestamp of the latest memtable to be submitted to asyncFlush
@@ -92,14 +90,6 @@ public class FileNodeProcessorV2 {
 
   private VersionController versionController;
 
-  // TODO delete the file path
-  private String absoluteFileNodeRestoreFilePath;
-
-  private FileNodeProcessorStoreV2 fileNodeProcessorStore;
-
-  // TODO delete this lock
-  private final Object fileNodeRestoreLock = new Object();
-
   public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
       throws FileNodeProcessorException {
     this.storageGroupName = storageGroupName;
@@ -122,20 +112,9 @@ public class FileNodeProcessorV2 {
           "directory {}", storageGroupName, restoreFolder.getAbsolutePath());
     }
 
-    absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
-
-    try {
-      fileNodeProcessorStore = readStoreFromDiskOrCreate();
-    } catch (FileNodeProcessorException e) {
-      LOGGER.error("The fileNode processor {} encountered an error when recovering restore " +
-          "information.", storageGroupName);
-      throw new FileNodeProcessorException(e);
-    }
+    String absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
 
-    // TODO deep clone the lastupdate time, change the getSequenceFileList to V2
-    sequenceFileList = fileNodeProcessorStore.getSequenceFileList();
-    unSequenceFileList = fileNodeProcessorStore.getUnSequenceFileList();
-    latestTimeForEachDevice = fileNodeProcessorStore.getLatestTimeMap();
+    recovery();
 
     /**
      * version controller
@@ -150,6 +129,10 @@ public class FileNodeProcessorV2 {
     this.fileSchema = constructFileSchema(storageGroupName);
   }
 
+  // TODO: Jiang Tian
+  private void recovery(){
+  }
+
   private FileSchema constructFileSchema(String storageGroupName) {
     List<MeasurementSchema> columnSchemaList;
     columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
@@ -177,44 +160,6 @@ public class FileNodeProcessorV2 {
     }
   }
 
-
-  /**
-   * read file node store from disk or create a new one
-   */
-  private FileNodeProcessorStoreV2 readStoreFromDiskOrCreate() throws FileNodeProcessorException {
-
-    synchronized (fileNodeRestoreLock) {
-      File restoreFile = new File(absoluteFileNodeRestoreFilePath);
-      if (!restoreFile.exists() || restoreFile.length() == 0) {
-        return new FileNodeProcessorStoreV2(false, new HashMap<>(),
-            new ArrayList<>(), new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
-      }
-      try (FileInputStream inputStream = new FileInputStream(absoluteFileNodeRestoreFilePath)) {
-        return FileNodeProcessorStoreV2.deSerialize(inputStream);
-      } catch (IOException e) {
-        LOGGER
-            .error("Failed to deserialize the FileNodeRestoreFile {}, {}",
-                absoluteFileNodeRestoreFilePath,
-                e);
-        throw new FileNodeProcessorException(e);
-      }
-    }
-  }
-
-  private void writeStoreToDisk(FileNodeProcessorStoreV2 fileNodeProcessorStore)
-      throws FileNodeProcessorException {
-
-    synchronized (fileNodeRestoreLock) {
-      try (FileOutputStream fileOutputStream = new FileOutputStream(absoluteFileNodeRestoreFilePath)) {
-        fileNodeProcessorStore.serialize(fileOutputStream);
-        LOGGER.debug("The filenode processor {} writes restore information to the restore file",
-            storageGroupName);
-      } catch (IOException e) {
-        throw new FileNodeProcessorException(e);
-      }
-    }
-  }
-
   public boolean insert(TSRecord tsRecord) {
     lock.writeLock().lock();
     boolean result;
@@ -247,16 +192,18 @@ public class FileNodeProcessorV2 {
     if (unsealedTsFileProcessor == null) {
       if (sequence) {
         String baseDir = directories.getNextFolderForTsfile();
-        String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
-        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
-            fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+        String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+            new File(filePath),
+            fileSchema, versionController, this::closeUnsealedTsFileProcessor);
         sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
       } else {
         // TODO check if the disk is full
         String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
-        String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
-        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
-            fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+        String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+            new File(filePath),
+            fileSchema, versionController, this::closeUnsealedTsFileProcessor);
         unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
       }
     }
@@ -356,29 +303,17 @@ public class FileNodeProcessorV2 {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and write method.
-  private void closeUnsealedTsFileProcessorCallBack(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
+  private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
     closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
-    synchronized (fileNodeProcessorStore) {
-      fileNodeProcessorStore.setLatestTimeMap(latestTimeForEachDevice);
-
-      if (!sequenceFileList.isEmpty()) {
-        // end time with one start time
-        Map<String, Long> endTimeMap = new HashMap<>();
-        TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
-        synchronized (resource) {
-          for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-            String deviceId = startTime.getKey();
-            endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
-          }
-          resource.setEndTimeMap(endTimeMap);
-        }
-      }
-      fileNodeProcessorStore.setSequenceFileList(sequenceFileList);
-      try {
-        writeStoreToDisk(fileNodeProcessorStore);
-      } catch (FileNodeProcessorException e) {
-        LOGGER.error("write FileNodeStore info error, because {}", e.getMessage(), e);
+    // end time with one start time
+    Map<String, Long> endTimeMap = new HashMap<>();
+    TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
+    synchronized (resource) {
+      for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+        String deviceId = startTime.getKey();
+        endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
       }
+      resource.setEndTimeMap(endTimeMap);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
deleted file mode 100644
index 1706203..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-public interface Callback<T> {
-
-  void call(T object);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a5e229a..621b5fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -43,11 +43,11 @@ public class MemTableFlushTaskV2 {
   private boolean stop = false;
   private String processorName;
 
-  private Callback<IMemTable> flushCallBack;
+  private Consumer<IMemTable> flushCallBack;
   private IMemTable memTable;
 
   public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String processorName,
-      Callback<IMemTable> callBack) {
+      Consumer<IMemTable> callBack) {
     this.tsFileIoWriter = writer;
     this.processorName = processorName;
     this.flushCallBack = callBack;
@@ -135,7 +135,7 @@ public class MemTableFlushTaskV2 {
     LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
 
     tsFileIoWriter.makeMetadataVisible();
-    flushCallBack.call(memTable);
+    flushCallBack.accept(memTable);
   });
 
 


[incubator-iotdb] 03/03: Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit f28b6716b47d65efd2be896e3b57e19bdf7e7c37
Merge: e89b1b6 8a0c12f
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 16:33:01 2019 +0800

    Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

 .../engine/bufferwrite/BufferWriteProcessor.java   | 18 ++---
 .../db/engine/filenode/FileNodeProcessor.java      | 18 +++--
 .../db/engine/overflow/io/OverflowProcessor.java   | 20 ++----
 .../db/engine/overflow/io/OverflowResource.java    | 16 ++++-
 .../org/apache/iotdb/db/writelog/LogPosition.java  | 26 -------
 .../org/apache/iotdb/db/writelog/RecoverStage.java | 50 -------------
 .../apache/iotdb/db/writelog/io/ILogReader.java    | 21 +++++-
 .../apache/iotdb/db/writelog/io/ILogWriter.java    | 20 ++++++
 .../org/apache/iotdb/db/writelog/io/LogWriter.java | 24 ++++---
 .../iotdb/db/writelog/io/MultiFileLogReader.java   |  4 ++
 .../iotdb/db/writelog/io/SingleFileLogReader.java  | 21 ++++--
 .../writelog/manager/MultiFileLogNodeManager.java  | 47 +++---------
 .../db/writelog/manager/WriteLogNodeManager.java   | 20 ++----
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 83 +++++++---------------
 .../iotdb/db/writelog/node/WriteLogNode.java       | 32 +++++----
 .../iotdb/db/writelog/recover/LogReplayer.java     | 26 ++++---
 ...rformer.java => SeqTsFileRecoverPerformer.java} | 61 ++++++++++------
 .../recover/UnseqTsFileRecoverPerformer.java       | 74 +++++++++++++++++++
 .../org/apache/iotdb/db/writelog/RecoverTest.java  |  7 +-
 19 files changed, 299 insertions(+), 289 deletions(-)