You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/10/26 11:34:34 UTC

[GitHub] [iotdb] neuyilan opened a new pull request #1865: [IOTDB-915]Add raft log mechanism and use persist log to catch up

neuyilan opened a new pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #1865: [IOTDB-915]Add raft log persist mechanism and use persist log to catch up

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865#discussion_r514724928



##########
File path: cluster/src/assembly/resources/conf/iotdb-cluster.properties
##########
@@ -129,7 +129,26 @@ flush_raft_log_threshold=10000
 # The cycle when raft log is periodically forced to be written to disk(in milliseconds)
 # If force_raft_log_period_in_ms = 0 it means force insert raft log to be written to disk after
 # each refreshment. Set this parameter to 0 may slow down the ingestion on slow disk.
-force_raft_log_period_in_ms=10
+force_raft_log_period_in_ms=1000
 
 # Size of log buffer in each RaftMember's LogManager(in byte).
-raft_log_buffer_size=16777216
\ No newline at end of file
+raft_log_buffer_size=16777216
+
+# The maximum value of the raft log index stored in the memory per raft group,
+# These indexes are used to index the location of the log on the disk
+max_raft_log_index_size_in_memory=10000
+
+# The maximum value of the raft log persisted on disk per file(in byte) per raft group
+max_raft_log_persist_data_size_per_file=1073741824
+
+# The maximum number of persistent raft log files on disk per raft group, So each raft group's

Review comment:
       Please fix typo

##########
File path: cluster/src/assembly/resources/conf/iotdb-cluster.properties
##########
@@ -129,7 +129,26 @@ flush_raft_log_threshold=10000
 # The cycle when raft log is periodically forced to be written to disk(in milliseconds)
 # If force_raft_log_period_in_ms = 0 it means force insert raft log to be written to disk after
 # each refreshment. Set this parameter to 0 may slow down the ingestion on slow disk.
-force_raft_log_period_in_ms=10
+force_raft_log_period_in_ms=1000
 
 # Size of log buffer in each RaftMember's LogManager(in byte).
-raft_log_buffer_size=16777216
\ No newline at end of file
+raft_log_buffer_size=16777216
+
+# The maximum value of the raft log index stored in the memory per raft group,
+# These indexes are used to index the location of the log on the disk
+max_raft_log_index_size_in_memory=10000
+
+# The maximum value of the raft log persisted on disk per file(in byte) per raft group

Review comment:
       It's better to add a default comment.such as 'default: 1G'

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -203,47 +256,113 @@ public void append(List<Log> entries) throws IOException {
    */
   private void putLogs(List<Log> entries) {
     for (Log log : entries) {
-      logBuffer.mark();
+      logDataBuffer.mark();
+      logIndexBuffer.mark();
       ByteBuffer logData = log.serialize();
       int size = logData.capacity() + Integer.BYTES;
       try {
-        logBuffer.putInt(logData.capacity());
-        logBuffer.put(logData);
-        logSizeDeque.addLast(size);
-        bufferedLogNum++;
+        logDataBuffer.putInt(logData.capacity());
+        logDataBuffer.put(logData);
+        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+        offsetOfTheCurrentLogDataOutputStream += size;
       } catch (BufferOverflowException e) {
         logger.info("Raft log buffer overflow!");
-        logBuffer.reset();
+        logDataBuffer.reset();
+        logIndexBuffer.reset();
         flushLogBuffer();
-        logBuffer.putInt(logData.capacity());
-        logBuffer.put(logData);
-        logSizeDeque.addLast(size);
-        bufferedLogNum++;
+        checkCloseCurrentFile(log.getCurrLogIndex() - 1);
+        logDataBuffer.putInt(logData.capacity());
+        logDataBuffer.put(logData);
+        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+        offsetOfTheCurrentLogDataOutputStream += size;
+      }
+    }
+  }
+
+  private void checkCloseCurrentFile(long commitIndex) {
+    if (offsetOfTheCurrentLogDataOutputStream > maxRaftLogPersistDataSizePerFile) {
+      try {
+        closeCurrentFile(commitIndex);
+        serializeMeta(meta);
+        createNewLogFile(logDir, commitIndex + 1);
+      } catch (IOException e) {
+        logger.error("check close current file failed", e);
       }
     }
   }
 
+  private void closeCurrentFile(long commitIndex) throws IOException {
+    lock.writeLock().lock();

Review comment:
       It seems all the callers(`close ()`,`checkCloseCurrentFile()`)  to this function's has got the writelock? 

##########
File path: cluster/src/assembly/resources/conf/iotdb-cluster.properties
##########
@@ -129,7 +129,26 @@ flush_raft_log_threshold=10000
 # The cycle when raft log is periodically forced to be written to disk(in milliseconds)
 # If force_raft_log_period_in_ms = 0 it means force insert raft log to be written to disk after
 # each refreshment. Set this parameter to 0 may slow down the ingestion on slow disk.
-force_raft_log_period_in_ms=10
+force_raft_log_period_in_ms=1000
 

Review comment:
       It seems that the `max_unsnapshoted_log_size` can be deleted now

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private long offsetOfTheCurrentLogDataOutputStream = 0;
+
+  /**
+   * file name pattern:
+   * <p>
+   * for log data file: ${startTime}-${Long.MAX_VALUE}-{version}-data
+   * <p>
+   * for log index file: ${startTime}-${Long.MAX_VALUE}-{version}-idx
+   */
+  private static final int FILE_NAME_PART_LENGTH = 4;
 
-  private final int flushRaftLogThreshold = ClusterDescriptor.getInstance().getConfig()
-      .getFlushRaftLogThreshold();
+  private int maxRaftLogIndexSizeInMemory = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogIndexSizeInMemory();
 
-  private int bufferedLogNum = 0;
+  private int maxRaftLogPersistDataSizePerFile = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogPersistDataSizePerFile();
 
+  private int maxNumberOfPersistRaftLogFiles = ClusterDescriptor.getInstance().getConfig()
+      .getMaxNumberOfPersistRaftLogFiles();
+
+  private int maxPersistRaftLogNumberOnDisk = ClusterDescriptor.getInstance().getConfig()
+      .getMaxPersistRaftLogNumberOnDisk();
+
+  private ScheduledExecutorService persistLogDeleteExecutorService;
+  private ScheduledFuture<?> persistLogDeleteLogFuture;
+
+  /**
+   * indicate the first raft log's index of {@link SyncLogDequeSerializer#logIndexOffsetList}, for
+   * example, if firstLogIndex=1000, then the offset of the log index 1000 equals
+   * logIndexOffsetList[0], the offset of the log index 1001 equals logIndexOffsetList[1], and so
+   * on.
+   */
+  private long firstLogIndex = 0;
+
+  /**
+   * the offset of the log's index, for example, the first value is the offset of index
+   * ${firstLogIndex}, the second value is the offset of index ${firstLogIndex+1}
+   */
+  private List<Long> logIndexOffsetList;

Review comment:
       As currently the  default maxSize of each data file is 1G,it's seems a Integer is able to record the offset.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
##########
@@ -292,15 +292,29 @@ public long getLastLogIndex() {
   public long getTerm(long index) throws EntryCompactedException {
     long dummyIndex = getFirstIndex() - 1;
     if (index < dummyIndex) {
+      // search in disk

Review comment:
       I thought these code should be in the function `maybeTerm` in `committedEntryManager`, which should manager all persisted entries, it will throw a `EntryCompactedException` or get log from disk if `isEnableRaftLogPersistence ` is enabled. 
   BTW, I'm a little confused about whether we should get log from disk in function `getTerm`,If so, then maybe we should change `isEnableUsePersistLogOnDiskToCatchUp`'s name

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
##########
@@ -127,6 +127,32 @@
 
   private int pullSnapshotRetryIntervalMs = 5 * 1000;
 
+  /**
+   * The maximum value of the raft log index stored in the memory per raft group, These indexes are
+   * used to index the location of the log on the disk
+   */
+  private int maxRaftLogIndexSizeInMemory = 10000;
+
+  /**
+   * The maximum value of the raft log persisted on disk per file(in byte) per raft group

Review comment:
       same as above

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private long offsetOfTheCurrentLogDataOutputStream = 0;
+
+  /**
+   * file name pattern:
+   * <p>
+   * for log data file: ${startTime}-${Long.MAX_VALUE}-{version}-data
+   * <p>
+   * for log index file: ${startTime}-${Long.MAX_VALUE}-{version}-idx
+   */
+  private static final int FILE_NAME_PART_LENGTH = 4;
 
-  private final int flushRaftLogThreshold = ClusterDescriptor.getInstance().getConfig()
-      .getFlushRaftLogThreshold();
+  private int maxRaftLogIndexSizeInMemory = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogIndexSizeInMemory();
 
-  private int bufferedLogNum = 0;
+  private int maxRaftLogPersistDataSizePerFile = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogPersistDataSizePerFile();
 
+  private int maxNumberOfPersistRaftLogFiles = ClusterDescriptor.getInstance().getConfig()
+      .getMaxNumberOfPersistRaftLogFiles();
+
+  private int maxPersistRaftLogNumberOnDisk = ClusterDescriptor.getInstance().getConfig()
+      .getMaxPersistRaftLogNumberOnDisk();
+
+  private ScheduledExecutorService persistLogDeleteExecutorService;
+  private ScheduledFuture<?> persistLogDeleteLogFuture;
+
+  /**
+   * indicate the first raft log's index of {@link SyncLogDequeSerializer#logIndexOffsetList}, for
+   * example, if firstLogIndex=1000, then the offset of the log index 1000 equals
+   * logIndexOffsetList[0], the offset of the log index 1001 equals logIndexOffsetList[1], and so
+   * on.
+   */
+  private long firstLogIndex = 0;
+
+  /**
+   * the offset of the log's index, for example, the first value is the offset of index
+   * ${firstLogIndex}, the second value is the offset of index ${firstLogIndex+1}
+   */
+  private List<Long> logIndexOffsetList;
+
+  private static final int logDeleteCheckIntervalSecond = 1;
 
   /**
    * the lock uses when change the logSizeDeque

Review comment:
       change the `logSizeDeque `

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private long offsetOfTheCurrentLogDataOutputStream = 0;
+
+  /**
+   * file name pattern:
+   * <p>
+   * for log data file: ${startTime}-${Long.MAX_VALUE}-{version}-data
+   * <p>
+   * for log index file: ${startTime}-${Long.MAX_VALUE}-{version}-idx
+   */
+  private static final int FILE_NAME_PART_LENGTH = 4;
 
-  private final int flushRaftLogThreshold = ClusterDescriptor.getInstance().getConfig()
-      .getFlushRaftLogThreshold();
+  private int maxRaftLogIndexSizeInMemory = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogIndexSizeInMemory();
 
-  private int bufferedLogNum = 0;
+  private int maxRaftLogPersistDataSizePerFile = ClusterDescriptor.getInstance().getConfig()
+      .getMaxRaftLogPersistDataSizePerFile();
 
+  private int maxNumberOfPersistRaftLogFiles = ClusterDescriptor.getInstance().getConfig()
+      .getMaxNumberOfPersistRaftLogFiles();
+
+  private int maxPersistRaftLogNumberOnDisk = ClusterDescriptor.getInstance().getConfig()
+      .getMaxPersistRaftLogNumberOnDisk();
+
+  private ScheduledExecutorService persistLogDeleteExecutorService;
+  private ScheduledFuture<?> persistLogDeleteLogFuture;
+
+  /**
+   * indicate the first raft log's index of {@link SyncLogDequeSerializer#logIndexOffsetList}, for
+   * example, if firstLogIndex=1000, then the offset of the log index 1000 equals
+   * logIndexOffsetList[0], the offset of the log index 1001 equals logIndexOffsetList[1], and so
+   * on.
+   */
+  private long firstLogIndex = 0;
+
+  /**
+   * the offset of the log's index, for example, the first value is the offset of index
+   * ${firstLogIndex}, the second value is the offset of index ${firstLogIndex+1}
+   */
+  private List<Long> logIndexOffsetList;
+
+  private static final int logDeleteCheckIntervalSecond = 1;

Review comment:
       As the function `checkDeletePersistRaftLog ` will get the write lock, I doubt whether `1` is too small?

##########
File path: cluster/src/assembly/resources/conf/iotdb-cluster.properties
##########
@@ -129,7 +129,26 @@ flush_raft_log_threshold=10000
 # The cycle when raft log is periodically forced to be written to disk(in milliseconds)
 # If force_raft_log_period_in_ms = 0 it means force insert raft log to be written to disk after
 # each refreshment. Set this parameter to 0 may slow down the ingestion on slow disk.
-force_raft_log_period_in_ms=10
+force_raft_log_period_in_ms=1000
 
 # Size of log buffer in each RaftMember's LogManager(in byte).
-raft_log_buffer_size=16777216
\ No newline at end of file
+raft_log_buffer_size=16777216
+
+# The maximum value of the raft log index stored in the memory per raft group,
+# These indexes are used to index the location of the log on the disk
+max_raft_log_index_size_in_memory=10000
+
+# The maximum value of the raft log persisted on disk per file(in byte) per raft group
+max_raft_log_persist_data_size_per_file=1073741824
+
+# The maximum number of persistent raft log files on disk per raft group, So each raft group's
+# So each raft group's log takes up disk space approximately equals
+# max_raft_log_persist_data_size_per_file*max_number_of_persist_raft_log_files
+max_number_of_persist_raft_log_files=5
+
+# The maximum number of logs saved on the disk
+max_persist_raft_log_number_on_disk=1000000

Review comment:
       I doubt if this parameter is necessary, given that you have already prevented storage abuse by taking two parameters: `max_number_of_persist_raft_log_files` and `max_raft_log_persist_data_size_per_file`

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer

Review comment:
       As you have used `logIndexOffsetList` to record offset in `putLog`, I doubt whether it's necessary to maintain this buffer in `putLog`.Maybe You can generate buffer in `flushLogBuffer `.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -160,31 +216,28 @@ public LogManagerMeta getMeta() {
    * Recover all the logs in disk. This function will be called once this instance is created.
    */
   @Override
-  public List<Log> getAllEntries() {
-    List<Log> logs = recoverLog();
-    int size = logs.size();
-    if (size != 0 && meta.getLastLogIndex() <= logs.get(size - 1).getCurrLogIndex()) {
-      meta.setLastLogTerm(logs.get(size - 1).getCurrLogTerm());
-      meta.setLastLogIndex(logs.get(size - 1).getCurrLogIndex());
-      meta.setCommitLogTerm(logs.get(size - 1).getCurrLogTerm());
-      meta.setCommitLogIndex(logs.get(size - 1).getCurrLogIndex());
+  public List<Log> getAllEntriesBeforeAppliedIndex() {

Review comment:
       It seems that this function will return empty list when `maxHaveAppliedCommitIndex == commitLogIndex `,So how can we handle redo log when restart as we have not merged wal and raft logs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #1865: [IOTDB-915]Add raft log persist mechanism and use persist log to catch up

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865#discussion_r514974862



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer

Review comment:
       As you have used `logIndexOffsetList` to record offset in `putLog`, I doubt whether it's necessary to maintain this buffer in `putLog`.Maybe You can generate index buffer in `flushLogBuffer `.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jt2594838 commented on a change in pull request #1865: [IOTDB-915]Add raft log persist mechanism and use persist log to catch up

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on a change in pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865#discussion_r514949721



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
##########
@@ -191,12 +191,11 @@ private AppendEntriesRequest prepareRequest(List<ByteBuffer> logList, int startP
         logger.error("getTerm failed for newly append entries", e);
       }
     }
+    logger.debug("{}, node={} catchup request={}", raftMember.getName(), node, request.toString());

Review comment:
       Replace `request.toString()` with simply `request`.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -160,31 +216,28 @@ public LogManagerMeta getMeta() {
    * Recover all the logs in disk. This function will be called once this instance is created.
    */
   @Override
-  public List<Log> getAllEntries() {
-    List<Log> logs = recoverLog();
-    int size = logs.size();
-    if (size != 0 && meta.getLastLogIndex() <= logs.get(size - 1).getCurrLogIndex()) {
-      meta.setLastLogTerm(logs.get(size - 1).getCurrLogTerm());
-      meta.setLastLogIndex(logs.get(size - 1).getCurrLogIndex());
-      meta.setCommitLogTerm(logs.get(size - 1).getCurrLogTerm());
-      meta.setCommitLogIndex(logs.get(size - 1).getCurrLogIndex());
+  public List<Log> getAllEntriesBeforeAppliedIndex() {
+    logger.debug("getAllEntriesBeforeAppliedIndex, maxHaveAppliedCommitIndex={}, commitLogIndex={}",
+        meta.getMaxHaveAppliedCommitIndex(), meta.getCommitLogIndex());
+    if (meta.getMaxHaveAppliedCommitIndex() >= meta.getCommitLogIndex()) {
+      return Collections.emptyList();
     }
-    return logs;
+    return getLogs(meta.getMaxHaveAppliedCommitIndex(), meta.getCommitLogIndex());

Review comment:
       It seems more like `getAllEntriesAfterAppliedIndex` instead of  `getAllEntriesBeforeAppliedIndex`.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -203,47 +256,113 @@ public void append(List<Log> entries) throws IOException {
    */
   private void putLogs(List<Log> entries) {
     for (Log log : entries) {
-      logBuffer.mark();
+      logDataBuffer.mark();
+      logIndexBuffer.mark();
       ByteBuffer logData = log.serialize();
       int size = logData.capacity() + Integer.BYTES;
       try {
-        logBuffer.putInt(logData.capacity());
-        logBuffer.put(logData);
-        logSizeDeque.addLast(size);
-        bufferedLogNum++;
+        logDataBuffer.putInt(logData.capacity());
+        logDataBuffer.put(logData);
+        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+        offsetOfTheCurrentLogDataOutputStream += size;
       } catch (BufferOverflowException e) {
         logger.info("Raft log buffer overflow!");
-        logBuffer.reset();
+        logDataBuffer.reset();
+        logIndexBuffer.reset();
         flushLogBuffer();
-        logBuffer.putInt(logData.capacity());
-        logBuffer.put(logData);
-        logSizeDeque.addLast(size);
-        bufferedLogNum++;
+        checkCloseCurrentFile(log.getCurrLogIndex() - 1);
+        logDataBuffer.putInt(logData.capacity());
+        logDataBuffer.put(logData);
+        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+        offsetOfTheCurrentLogDataOutputStream += size;
+      }
+    }
+  }
+
+  private void checkCloseCurrentFile(long commitIndex) {
+    if (offsetOfTheCurrentLogDataOutputStream > maxRaftLogPersistDataSizePerFile) {
+      try {
+        closeCurrentFile(commitIndex);
+        serializeMeta(meta);
+        createNewLogFile(logDir, commitIndex + 1);
+      } catch (IOException e) {
+        logger.error("check close current file failed", e);
       }
     }
   }
 
+  private void closeCurrentFile(long commitIndex) throws IOException {
+    lock.writeLock().lock();
+    try {
+      if (currentLogDataOutputStream != null) {
+        currentLogDataOutputStream.close();
+        currentLogDataOutputStream = null;
+      }
+
+      if (currentLogIndexOutputStream != null) {
+        currentLogIndexOutputStream.close();
+        currentLogIndexOutputStream = null;
+      }
+      File currentLogDataFile = getCurrentLogDataFile();
+      String newDataFileName = currentLogDataFile.getName()
+          .replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(commitIndex));
+      File newCurrentLogDatFile = SystemFileFactory.INSTANCE
+          .getFile(currentLogDataFile.getParent() + File.separator + newDataFileName);
+      if (!currentLogDataFile.renameTo(newCurrentLogDatFile)) {
+        logger.error("rename log data file={} failed", currentLogDataFile.getAbsoluteFile());
+      }
+      logDataFileList.remove(logDataFileList.size() - 1);
+      logDataFileList.add(newCurrentLogDatFile);

Review comment:
       Maybe `list.set()` is enough for this.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncLogDequeSerializer.class);
-  private static final String LOG_FILE_PREFIX = ".data";
+  private static final String LOG_DATA_FILE_SUFFIX = "data";
+  private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+  /**
+   * the log data files
+   */
+  private List<File> logDataFileList;
+
+  /**
+   * the log index files
+   */
+  private List<File> logIndexFileList;
 
-  List<File> logFileList;
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
-  private FileOutputStream currentLogOutputStream;
-  private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+  private FileOutputStream currentLogDataOutputStream;
+  private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
-  // mark first log position
-  private long firstLogPosition = 0;
-  // removed log size
-  private long removedLogSize = 0;
-  // when the removedLogSize larger than this, we actually delete logs
-  private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
-      .getMaxUnsnapshotLogSize();
-  // min version of available log
+
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
-  // max version of available log
+
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
-  // log dir
+
   private String logDir;
-  // version controller
+
   private VersionController versionController;
 
-  private ByteBuffer logBuffer = ByteBuffer
+  private ByteBuffer logDataBuffer = ByteBuffer
       .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+  private ByteBuffer logIndexBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private long offsetOfTheCurrentLogDataOutputStream = 0;
+
+  /**
+   * file name pattern:
+   * <p>
+   * for log data file: ${startTime}-${Long.MAX_VALUE}-{version}-data
+   * <p>
+   * for log index file: ${startTime}-${Long.MAX_VALUE}-{version}-idx

Review comment:
       Is it really `startTime`? I think it should more clear here.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -349,9 +495,136 @@ private void checkLogFile(File file) {
       } catch (IOException e) {
         logger.warn("Cannot delete outdated log file {}", file);
       }
+      return false;
+    }
+
+    String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+    // start index should be smaller than end index
+    if (Long.parseLong(splits[0]) > Long.parseLong(splits[1])) {
+      try {
+        Files.delete(file.toPath());
+      } catch (IOException e) {
+        logger.warn("Cannot delete incorrect log file {}", file);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  private void recoverTheLastLogFile() {
+    if (logIndexFileList.isEmpty()) {
+      logger.info("no log index file to recover");
+      return;
+    }
+
+    File lastIndexFile = logIndexFileList.get(logIndexFileList.size() - 1);
+    long endIndex = Long.parseLong(lastIndexFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+    boolean success = true;
+    if (endIndex != Long.MAX_VALUE) {
+      logger.info("last log index file={} no need to recover", lastIndexFile.getAbsoluteFile());
+    } else {
+      success = recoverTheLastLogIndexFile(lastIndexFile);
+    }
+
+    if (!success) {
+      logger.error("recover log index file failed, clear all logs in disk, {}",
+          lastIndexFile.getAbsoluteFile());
+      for (int i = 0; i < logIndexFileList.size(); i++) {
+        deleteLogDataAndIndexFile(i);
+      }
+      clearFirstLogIndex();
+
+      return;
+    }
+
+    File lastDataFile = logDataFileList.get(logDataFileList.size() - 1);
+    endIndex = Long.parseLong(lastDataFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+    if (endIndex != Long.MAX_VALUE) {
+      logger.info("last log data file={} no need to recover", lastDataFile.getAbsoluteFile());
+      return;
+    }
+
+    success = recoverTheLastLogDataFile(logDataFileList.get(logDataFileList.size() - 1));
+    if (!success) {
+      logger.error("recover log data file failed, clear all logs in disk,{}",
+          lastDataFile.getAbsoluteFile());
+      for (int i = 0; i < logIndexFileList.size(); i++) {
+        deleteLogDataAndIndexFile(i);
+      }
+      clearFirstLogIndex();
+    }
+  }
+
+  private boolean recoverTheLastLogDataFile(File file) {
+    String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+    long startIndex = Long.parseLong(splits[0]);
+    Pair<File, Pair<Long, Long>> fileStartAndEndIndex = getLogIndexFile(startIndex);
+    if (fileStartAndEndIndex.right.left == startIndex) {
+      long endIndex = fileStartAndEndIndex.right.right;
+      String newDataFileName = file.getName()
+          .replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
+      File newLogDataFile = SystemFileFactory.INSTANCE
+          .getFile(file.getParent() + File.separator + newDataFileName);
+      if (!file.renameTo(newLogDataFile)) {
+        logger.error("rename log data file={} failed when recover", file.getAbsoluteFile());
+      }
+      logDataFileList.remove(logDataFileList.size() - 1);
+      logDataFileList.add(newLogDataFile);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean recoverTheLastLogIndexFile(File file) {
+    logger.debug("start to recover the last log index file={}", file.getAbsoluteFile());
+    String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+    long startIndex = Long.parseLong(splits[0]);
+    int longLength = 8;
+    byte[] bytes = new byte[longLength];
+
+    int totalCount = 0;
+    long offset = 0;
+    try (FileInputStream inputStream = new FileInputStream(file)) {

Review comment:
       Better to use buffered stream.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -593,75 +815,385 @@ public void close() {
   }
 
   /**
-   * adjust maxRemovedLogSize to the first log file
+   * get file version from file The file name structure is as follows:
+   * {startLogIndex}-{endLogIndex}-{version}-data)
+   *
+   * @param file file
+   * @return version from file
+   */
+  private long getFileVersion(File file) {
+    return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
+  }
+
+  public void checkDeletePersistRaftLog() {
+    // 1. check the log index offset list size
+    try {
+      lock.writeLock().lock();
+      if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
+        int compactIndex = logIndexOffsetList.size() - maxRaftLogIndexSizeInMemory;
+        logIndexOffsetList.subList(0, compactIndex).clear();
+        firstLogIndex += compactIndex;
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    // 2. check the persist log file number
+    while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
+      deleteLogDataAndIndexFile(0);
+    }
+
+    // 3. check the persist log index number
+    while (!logDataFileList.isEmpty()) {
+      File firstFile = logDataFileList.get(0);
+      String[] splits = firstFile.getName().split(FILE_NAME_SEPARATOR);
+      if (meta.getCommitLogIndex() - Long.parseLong(splits[1]) > maxPersistRaftLogNumberOnDisk) {
+        deleteLogDataAndIndexFile(0);
+      } else {
+        return;
+      }
+    }

Review comment:
       If `maxPersistRaftLogNumberOnDisk` is too small, is it possible that the current file will be deleted here?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -593,75 +815,385 @@ public void close() {
   }
 
   /**
-   * adjust maxRemovedLogSize to the first log file
+   * get file version from file The file name structure is as follows:
+   * {startLogIndex}-{endLogIndex}-{version}-data)
+   *
+   * @param file file
+   * @return version from file
+   */
+  private long getFileVersion(File file) {
+    return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
+  }
+
+  public void checkDeletePersistRaftLog() {
+    // 1. check the log index offset list size
+    try {
+      lock.writeLock().lock();
+      if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
+        int compactIndex = logIndexOffsetList.size() - maxRaftLogIndexSizeInMemory;
+        logIndexOffsetList.subList(0, compactIndex).clear();
+        firstLogIndex += compactIndex;
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    // 2. check the persist log file number
+    while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
+      deleteLogDataAndIndexFile(0);
+    }
+
+    // 3. check the persist log index number
+    while (!logDataFileList.isEmpty()) {
+      File firstFile = logDataFileList.get(0);
+      String[] splits = firstFile.getName().split(FILE_NAME_SEPARATOR);
+      if (meta.getCommitLogIndex() - Long.parseLong(splits[1]) > maxPersistRaftLogNumberOnDisk) {
+        deleteLogDataAndIndexFile(0);
+      } else {
+        return;
+      }
+    }
+  }
+
+  private void deleteLogDataAndIndexFile(int index) {
+    File logDataFile = null;
+    File logIndexFile = null;
+    try {
+      lock.writeLock().lock();
+      logDataFile = logDataFileList.get(index);
+      logIndexFile = logIndexFileList.get(index);
+      Files.delete(logDataFile.toPath());
+      Files.delete(logIndexFile.toPath());
+      logDataFileList.remove(index);
+      logIndexFileList.remove(index);
+      logger.debug("delete date file={}, index file={}", logDataFile.getAbsoluteFile(),
+          logIndexFile.getAbsoluteFile());
+    } catch (IOException e) {
+      logger.error("delete file failed, index={}, data file={}, index file={}", index,
+          logDataFile == null ? null : logDataFile.getAbsoluteFile(),
+          logIndexFile == null ? null : logIndexFile.getAbsoluteFile());
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * The file name structure is as follows: {startLogIndex}-{endLogIndex}-{version}-data)
+   *
+   * @param file1 File to compare
+   * @param file2 File to compare
    */
-  private void adjustNextThreshold() {
-    if (!logFileList.isEmpty()) {
-      maxRemovedLogSize = logFileList.get(0).length();
+  private int comparePersistLogFileName(File file1, File file2) {
+    String[] items1 = file1.getName().split(FILE_NAME_SEPARATOR);
+    String[] items2 = file2.getName().split(FILE_NAME_SEPARATOR);
+    if (items1.length != FILE_NAME_PART_LENGTH || items2.length != FILE_NAME_PART_LENGTH) {
+      logger.error(
+          "file1={}, file2={} name should be in the following format: startLogIndex-endLogIndex-version-data",
+          file1.getAbsoluteFile(), file2.getAbsoluteFile());
+    }
+    long startLogIndex1 = Long.parseLong(items1[0]);
+    long startLogIndex2 = Long.parseLong(items2[0]);
+    int res = Long.compare(startLogIndex1, startLogIndex2);
+    if (res == 0) {
+      return Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1]));
     }
+    return res;
   }
 
   /**
-   * actually delete the data file which only contains removed data
+   * @param startIndex the log start index
+   * @param endIndex   the log end index
+   * @return the raft log which index between [startIndex, endIndex] or empty if not found
    */
-  private void actuallyDeleteFile() {
-    Iterator<File> logFileIterator = logFileList.iterator();
-    while (logFileIterator.hasNext()) {
-      File logFile = logFileIterator.next();
-      if (logger.isDebugEnabled()) {
-        logger.debug("Examining file for removal, file: {}, len: {}, removedLogSize: {}", logFile
-            , logFile.length(), removedLogSize);
-      }
-      if (logFile.length() > removedLogSize) {
-        break;
-      }
-
-      logger.info("Removing a log file {}, len: {}, removedLogSize: {}", logFile,
-          logFile.length(), removedLogSize);
-      removedLogSize -= logFile.length();
-      // if system down before delete, we can use this to delete file during recovery
-      minAvailableVersion = getFileVersion(logFile);
-      serializeMeta(meta);
+  @Override
+  public List<Log> getLogs(long startIndex, long endIndex) {

Review comment:
       I think we should enforce a limit on this method to avoid out ot memory when the range is too long.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jt2594838 commented on pull request #1865: [IOTDB-915]Add raft log persist mechanism and use persist log to catch up

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865#issuecomment-720847777


   It seems we got compilation error.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jt2594838 merged pull request #1865: [IOTDB-915]Add raft log persist mechanism and use persist log to catch up

Posted by GitBox <gi...@apache.org>.
jt2594838 merged pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org