You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/29 13:19:38 UTC

[incubator-iotdb] 01/01: flush periodically

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4a77450fc701d50a988b4d4dc9c197a6e0f0cac5
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jul 29 21:19:04 2020 +0800

    flush periodically
---
 .../iotdb/cluster/log/StableEntryManager.java      |  5 +++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 21 ++++++++++++++++---
 .../serializable/SyncLogDequeSerializer.java       | 24 +++++++++++++++++-----
 3 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 38c6d96..164bfc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -28,6 +28,11 @@ public interface StableEntryManager {
 
   void append(List<Log> entries) throws IOException;
 
+
+  void flushLogBuffer();
+
+  void forceFlushLogBuffer();
+
   void removeCompactedEntries(long index);
 
   void setHardStateAndFlush(HardState state);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index d998f7d..748a261 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,16 @@ abstract public class RaftLogManager {
         .scheduleAtFixedRate(this::checkDeleteLog, logDeleteCheckIntervalSecond,
             logDeleteCheckIntervalSecond,
             TimeUnit.SECONDS);
+
+    /**
+     * flush log to file periodically
+     */
+    int logFlushTimeIntervalMS = ClusterDescriptor.getInstance().getConfig()
+        .getForceRaftLogPeriodInMS();
+    if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+      executorService.scheduleAtFixedRate(this::flushLogPeriodically, logFlushTimeIntervalMS,
+          logFlushTimeIntervalMS, TimeUnit.MILLISECONDS);
+    }
   }
 
   abstract public Snapshot getSnapshot();
@@ -160,8 +170,7 @@ abstract public class RaftLogManager {
    *
    * @param index request entry index
    * @return throw EntryCompactedException if index < dummyIndex, -1 if index > lastIndex or the
-   * entry is compacted, otherwise
-   * return the entry's term for given index
+   * entry is compacted, otherwise return the entry's term for given index
    * @throws EntryCompactedException
    */
   public long getTerm(long index) throws EntryCompactedException {
@@ -431,7 +440,7 @@ abstract public class RaftLogManager {
           applyEntries(entries, ignoreExecutionExceptions);
         } catch (TruncateCommittedEntryException e) {
           logger.error("{}: Unexpected error:", name, e);
-        } catch (IOException e){
+        } catch (IOException e) {
           throw new LogExecutionException(e);
         }
       }
@@ -589,6 +598,12 @@ abstract public class RaftLogManager {
     }
   }
 
+  public void flushLogPeriodically() {
+    synchronized (this) {
+      getStableEntryManager().flushLogBuffer();
+    }
+  }
+
   private void innerDeleteLog() {
     long removeSize = committedEntryManager.getTotalSize() - minNumOfLogsInMem;
     long compactIndex = committedEntryManager.getDummyIndex() + removeSize;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index 6a3d529..61306b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -216,16 +216,16 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /**
-   * Flush current log buffer to the disk.
-   */
-  private void flushLogBuffer() {
+  @Override
+  public void flushLogBuffer() {
+
     lock.writeLock().lock();
     try {
       if (bufferedLogNum == 0) {
         return;
       }
       // write into disk
+      System.out.println(">>>>>>>>>>>>>>>>>>> flush log");
       try {
         checkStream();
         ReadWriteIOUtils
@@ -243,6 +243,20 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   @Override
+  public void forceFlushLogBuffer() {
+    flushLogBuffer();
+    lock.writeLock().lock();
+    try {
+      currentLogOutputStream.getChannel().force(true);
+    } catch (IOException e) {
+      logger.error("Error when force flushing logs serialization: ", e);
+      return;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
     serializeMeta(meta);
@@ -616,7 +630,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   @Override
   public void close() {
-    flushLogBuffer();
+    forceFlushLogBuffer();
     lock.writeLock().lock();
     try {
       if (currentLogOutputStream != null) {