You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/29 13:19:37 UTC

[incubator-iotdb] branch kyy created (now 4a77450)

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a change to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 4a77450  flush periodically

This branch includes the following new commits:

     new 4a77450  flush periodically

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: flush periodically

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4a77450fc701d50a988b4d4dc9c197a6e0f0cac5
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jul 29 21:19:04 2020 +0800

    flush periodically
---
 .../iotdb/cluster/log/StableEntryManager.java      |  5 +++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 21 ++++++++++++++++---
 .../serializable/SyncLogDequeSerializer.java       | 24 +++++++++++++++++-----
 3 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 38c6d96..164bfc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -28,6 +28,11 @@ public interface StableEntryManager {
 
   void append(List<Log> entries) throws IOException;
 
+
+  void flushLogBuffer();
+
+  void forceFlushLogBuffer();
+
   void removeCompactedEntries(long index);
 
   void setHardStateAndFlush(HardState state);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index d998f7d..748a261 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,16 @@ abstract public class RaftLogManager {
         .scheduleAtFixedRate(this::checkDeleteLog, logDeleteCheckIntervalSecond,
             logDeleteCheckIntervalSecond,
             TimeUnit.SECONDS);
+
+    /**
+     * flush log to file periodically
+     */
+    int logFlushTimeIntervalMS = ClusterDescriptor.getInstance().getConfig()
+        .getForceRaftLogPeriodInMS();
+    if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+      executorService.scheduleAtFixedRate(this::flushLogPeriodically, logFlushTimeIntervalMS,
+          logFlushTimeIntervalMS, TimeUnit.MILLISECONDS);
+    }
   }
 
   abstract public Snapshot getSnapshot();
@@ -160,8 +170,7 @@ abstract public class RaftLogManager {
    *
    * @param index request entry index
    * @return throw EntryCompactedException if index < dummyIndex, -1 if index > lastIndex or the
-   * entry is compacted, otherwise
-   * return the entry's term for given index
+   * entry is compacted, otherwise return the entry's term for given index
    * @throws EntryCompactedException
    */
   public long getTerm(long index) throws EntryCompactedException {
@@ -431,7 +440,7 @@ abstract public class RaftLogManager {
           applyEntries(entries, ignoreExecutionExceptions);
         } catch (TruncateCommittedEntryException e) {
           logger.error("{}: Unexpected error:", name, e);
-        } catch (IOException e){
+        } catch (IOException e) {
           throw new LogExecutionException(e);
         }
       }
@@ -589,6 +598,12 @@ abstract public class RaftLogManager {
     }
   }
 
+  public void flushLogPeriodically() {
+    synchronized (this) {
+      getStableEntryManager().flushLogBuffer();
+    }
+  }
+
   private void innerDeleteLog() {
     long removeSize = committedEntryManager.getTotalSize() - minNumOfLogsInMem;
     long compactIndex = committedEntryManager.getDummyIndex() + removeSize;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index 6a3d529..61306b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -216,16 +216,16 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /**
-   * Flush current log buffer to the disk.
-   */
-  private void flushLogBuffer() {
+  @Override
+  public void flushLogBuffer() {
+
     lock.writeLock().lock();
     try {
       if (bufferedLogNum == 0) {
         return;
       }
       // write into disk
+      System.out.println(">>>>>>>>>>>>>>>>>>> flush log");
       try {
         checkStream();
         ReadWriteIOUtils
@@ -243,6 +243,20 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   @Override
+  public void forceFlushLogBuffer() {
+    flushLogBuffer();
+    lock.writeLock().lock();
+    try {
+      currentLogOutputStream.getChannel().force(true);
+    } catch (IOException e) {
+      logger.error("Error when force flushing logs serialization: ", e);
+      return;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
     serializeMeta(meta);
@@ -616,7 +630,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   @Override
   public void close() {
-    flushLogBuffer();
+    forceFlushLogBuffer();
     lock.writeLock().lock();
     try {
       if (currentLogOutputStream != null) {