You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/27 14:05:30 UTC

[incubator-iotdb] 01/01: append

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a66c8f5eb3dcfd9f5fe9283caf591ae42ba1efd7
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 27 22:04:45 2020 +0800

    append
---
 .../resources/conf/iotdb-cluster.properties        | 16 ++++
 .../apache/iotdb/cluster/config/ClusterConfig.java | 38 +++++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    | 17 ++++
 .../iotdb/cluster/log/StableEntryManager.java      |  3 +-
 .../serializable/SyncLogDequeSerializer.java       | 92 ++++++++++++++++------
 5 files changed, 143 insertions(+), 23 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 57b1e54..fd8059a 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -102,5 +102,21 @@ enable_auto_create_schema=true
 # Weak consistency do not synchronize with the leader and simply use the local data
 consistency_level=mid
 
+# is raft log persistence enabled
+is_enable_raft_log_persistence=true
+
+# When a certain amount of raft log is reached, it will be flushed to disk
+# It is possible to lose at most flush_wal_threshold operations
+flush_raft_log_threshold=10000
+
+# The cycle when raft log is periodically forced to be written to disk(in milliseconds)
+# If force_raft_log_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the ingestion on slow disk.
+force_raft_log_period_in_ms=10
+
+raft_log_buffer_size=16777216
+
+
+
 
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index ebdda72..9d6d8b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -99,6 +99,17 @@ public class ClusterConfig {
 
   private boolean enableRaftLogPersistence = true;
 
+
+  private int flushRaftLogThreshold=10000;
+
+  private int forceRaftLogPeriodInMS=10;
+
+  /**
+   * Size of log buffer. If raft log persistence is enabled and the size of a insert plan
+   * is smaller than this parameter, then the insert plan will be rejected by WAL.
+   */
+  private int raftLogBufferSize = 16 * 1024 * 1024;
+
   /**
    * consistency level, now three consistency levels are supported: strong, mid and weak. Strong
    * consistency means the server will first try to synchronize with the leader to get the newest
@@ -109,6 +120,8 @@ public class ClusterConfig {
    */
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.MID_CONSISTENCY;
 
+
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
@@ -300,4 +313,29 @@ public class ClusterConfig {
   public void setMaxNumOfLogsInMem(int maxNumOfLogsInMem) {
     this.maxNumOfLogsInMem = maxNumOfLogsInMem;
   }
+
+  public int getRaftLogBufferSize() {
+    return raftLogBufferSize;
+  }
+
+  public void setRaftLogBufferSize(int raftLogBufferSize) {
+    this.raftLogBufferSize = raftLogBufferSize;
+  }
+
+  public int getFlushRaftLogThreshold() {
+    return flushRaftLogThreshold;
+  }
+
+  public void setFlushRaftLogThreshold(int flushRaftLogThreshold) {
+    this.flushRaftLogThreshold = flushRaftLogThreshold;
+  }
+
+  public int getForceRaftLogPeriodInMS() {
+    return forceRaftLogPeriodInMS;
+  }
+
+  public void setForceRaftLogPeriodInMS(int forceRaftLogPeriodInMS) {
+    this.forceRaftLogPeriodInMS = forceRaftLogPeriodInMS;
+  }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a37d84c..75f65a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -244,6 +244,23 @@ public class ClusterDescriptor {
         .getProperty("enable_auto_create_schema",
             String.valueOf(config.isEnableAutoCreateSchema()))));
 
+    config.setEnableRaftLogPersistence(
+        Boolean.parseBoolean(properties.getProperty("is_enable_raft_log_persistence",
+            String.valueOf(config.isEnableRaftLogPersistence()))));
+
+    config.setFlushRaftLogThreshold(Integer.parseInt(properties
+        .getProperty("flush_raft_log_threshold", String.valueOf(config.getFlushRaftLogThreshold())))
+    );
+
+    config.setForceRaftLogPeriodInMS(Integer.parseInt(properties
+        .getProperty("force_raft_log_period_in_ms",
+            String.valueOf(config.getForceRaftLogPeriodInMS())))
+    );
+
+    config.setRaftLogBufferSize(Integer.parseInt(properties
+        .getProperty("raft_log_buffer_size", String.valueOf(config.getRaftLogBufferSize())))
+    );
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 8edfd67..38c6d96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -19,13 +19,14 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.io.IOException;
 import java.util.List;
 
 public interface StableEntryManager {
 
   List<Log> getAllEntries();
 
-  void append(List<Log> entries);
+  void append(List<Log> entries) throws IOException;
 
   void removeCompactedEntries(long index);
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index ef31959..d116a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
@@ -81,6 +82,12 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   // version controller
   private VersionController versionController;
 
+  private ByteBuffer logBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private int bufferedLogNum = 0;
+
+
   /**
    * the lock uses when change the logSizeDeque
    */
@@ -158,13 +165,74 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   @Override
-  public void append(List<Log> entries) {
+  public void append(List<Log> entries) throws IOException {
     Log entry = entries.get(entries.size() - 1);
     meta.setCommitLogIndex(entry.getCurrLogIndex());
     meta.setCommitLogTerm(entry.getCurrLogTerm());
     meta.setLastLogIndex(entry.getCurrLogIndex());
     meta.setLastLogTerm(entry.getCurrLogTerm());
-    appendInternal(entries);
+
+    lock.writeLock().lock();
+    try{
+      putLogs(entries);
+      if(bufferedLogNum >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()){
+        flushLogBuffer();
+      }
+
+    }catch(BufferOverflowException e){
+      throw new IOException(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "otherwise, please increase the JVM memory", e
+      );
+    }finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Put each log in entries into local buffer. If the buffer overflows, flush the buffer to the
+   * disk, and try to push the log again.
+   *
+   * @param entries logs to put to buffer
+   */
+  private void putLogs(List<Log> entries) {
+    for (Log log : entries) {
+      logBuffer.mark();
+      try {
+        logBuffer.put(log.serialize());
+      } catch (BufferOverflowException e) {
+        logger.info("Raft log buffer overflow!");
+        logBuffer.reset();
+        flushLogBuffer();
+        logBuffer.put(log.serialize());
+      }
+      bufferedLogNum++;
+    }
+  }
+
+  /**
+   * Flush current log buffer to the disk.
+   */
+  private void flushLogBuffer(){
+    lock.writeLock().lock();
+    try{
+      if(bufferedLogNum == 0){
+        return;
+      }
+      // write into disk
+      try {
+        checkStream();
+        ReadWriteIOUtils.writeWithoutSize(logBuffer, currentLogOutputStream);
+      } catch (IOException e) {
+        logger.error("Error in logs serialization: ", e);
+        return;
+      }
+      logBuffer.clear();
+      bufferedLogNum = 0;
+      logger.debug("End flushing log buffer.");
+    }finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
@@ -325,26 +393,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return logFile;
   }
 
-  public void append(Log log) {
-    ByteBuffer data = log.serialize();
-    int totalSize = 0;
-    // write into disk
-    try {
-      checkStream();
-      totalSize = ReadWriteIOUtils.write(data, currentLogOutputStream);
-    } catch (IOException e) {
-      logger.error("Error in appending log {} ", log, e);
-    }
-
-    lock.writeLock().lock();
-    try {
-      logSizeDeque.addLast(totalSize);
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-  }
-
   public void appendInternal(List<Log> logs) {
     int bufferSize = 0;
     List<ByteBuffer> bufferList = new ArrayList<>(logs.size());