You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/27 14:05:29 UTC

[incubator-iotdb] branch kyy created (now a66c8f5)

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a change to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at a66c8f5  append

This branch includes the following new commits:

     new a66c8f5  append

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: append

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a66c8f5eb3dcfd9f5fe9283caf591ae42ba1efd7
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 27 22:04:45 2020 +0800

    append
---
 .../resources/conf/iotdb-cluster.properties        | 16 ++++
 .../apache/iotdb/cluster/config/ClusterConfig.java | 38 +++++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    | 17 ++++
 .../iotdb/cluster/log/StableEntryManager.java      |  3 +-
 .../serializable/SyncLogDequeSerializer.java       | 92 ++++++++++++++++------
 5 files changed, 143 insertions(+), 23 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 57b1e54..fd8059a 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -102,5 +102,21 @@ enable_auto_create_schema=true
 # Weak consistency do not synchronize with the leader and simply use the local data
 consistency_level=mid
 
+# is raft log persistence enabled
+is_enable_raft_log_persistence=true
+
+# When a certain amount of raft log is reached, it will be flushed to disk
+# It is possible to lose at most flush_wal_threshold operations
+flush_raft_log_threshold=10000
+
+# The cycle when raft log is periodically forced to be written to disk(in milliseconds)
+# If force_raft_log_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the ingestion on slow disk.
+force_raft_log_period_in_ms=10
+
+raft_log_buffer_size=16777216
+
+
+
 
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index ebdda72..9d6d8b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -99,6 +99,17 @@ public class ClusterConfig {
 
   private boolean enableRaftLogPersistence = true;
 
+
+  private int flushRaftLogThreshold=10000;
+
+  private int forceRaftLogPeriodInMS=10;
+
+  /**
+   * Size of log buffer. If raft log persistence is enabled and the size of a insert plan
+   * is smaller than this parameter, then the insert plan will be rejected by WAL.
+   */
+  private int raftLogBufferSize = 16 * 1024 * 1024;
+
   /**
    * consistency level, now three consistency levels are supported: strong, mid and weak. Strong
    * consistency means the server will first try to synchronize with the leader to get the newest
@@ -109,6 +120,8 @@ public class ClusterConfig {
    */
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.MID_CONSISTENCY;
 
+
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
@@ -300,4 +313,29 @@ public class ClusterConfig {
   public void setMaxNumOfLogsInMem(int maxNumOfLogsInMem) {
     this.maxNumOfLogsInMem = maxNumOfLogsInMem;
   }
+
+  public int getRaftLogBufferSize() {
+    return raftLogBufferSize;
+  }
+
+  public void setRaftLogBufferSize(int raftLogBufferSize) {
+    this.raftLogBufferSize = raftLogBufferSize;
+  }
+
+  public int getFlushRaftLogThreshold() {
+    return flushRaftLogThreshold;
+  }
+
+  public void setFlushRaftLogThreshold(int flushRaftLogThreshold) {
+    this.flushRaftLogThreshold = flushRaftLogThreshold;
+  }
+
+  public int getForceRaftLogPeriodInMS() {
+    return forceRaftLogPeriodInMS;
+  }
+
+  public void setForceRaftLogPeriodInMS(int forceRaftLogPeriodInMS) {
+    this.forceRaftLogPeriodInMS = forceRaftLogPeriodInMS;
+  }
+
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a37d84c..75f65a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -244,6 +244,23 @@ public class ClusterDescriptor {
         .getProperty("enable_auto_create_schema",
             String.valueOf(config.isEnableAutoCreateSchema()))));
 
+    config.setEnableRaftLogPersistence(
+        Boolean.parseBoolean(properties.getProperty("is_enable_raft_log_persistence",
+            String.valueOf(config.isEnableRaftLogPersistence()))));
+
+    config.setFlushRaftLogThreshold(Integer.parseInt(properties
+        .getProperty("flush_raft_log_threshold", String.valueOf(config.getFlushRaftLogThreshold())))
+    );
+
+    config.setForceRaftLogPeriodInMS(Integer.parseInt(properties
+        .getProperty("force_raft_log_period_in_ms",
+            String.valueOf(config.getForceRaftLogPeriodInMS())))
+    );
+
+    config.setRaftLogBufferSize(Integer.parseInt(properties
+        .getProperty("raft_log_buffer_size", String.valueOf(config.getRaftLogBufferSize())))
+    );
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 8edfd67..38c6d96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -19,13 +19,14 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.io.IOException;
 import java.util.List;
 
 public interface StableEntryManager {
 
   List<Log> getAllEntries();
 
-  void append(List<Log> entries);
+  void append(List<Log> entries) throws IOException;
 
   void removeCompactedEntries(long index);
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index ef31959..d116a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
@@ -81,6 +82,12 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   // version controller
   private VersionController versionController;
 
+  private ByteBuffer logBuffer = ByteBuffer
+      .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+  private int bufferedLogNum = 0;
+
+
   /**
    * the lock uses when change the logSizeDeque
    */
@@ -158,13 +165,74 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   @Override
-  public void append(List<Log> entries) {
+  public void append(List<Log> entries) throws IOException {
     Log entry = entries.get(entries.size() - 1);
     meta.setCommitLogIndex(entry.getCurrLogIndex());
     meta.setCommitLogTerm(entry.getCurrLogTerm());
     meta.setLastLogIndex(entry.getCurrLogIndex());
     meta.setLastLogTerm(entry.getCurrLogTerm());
-    appendInternal(entries);
+
+    lock.writeLock().lock();
+    try{
+      putLogs(entries);
+      if(bufferedLogNum >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()){
+        flushLogBuffer();
+      }
+
+    }catch(BufferOverflowException e){
+      throw new IOException(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "otherwise, please increase the JVM memory", e
+      );
+    }finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Put each log in entries into local buffer. If the buffer overflows, flush the buffer to the
+   * disk, and try to push the log again.
+   *
+   * @param entries logs to put to buffer
+   */
+  private void putLogs(List<Log> entries) {
+    for (Log log : entries) {
+      logBuffer.mark();
+      try {
+        logBuffer.put(log.serialize());
+      } catch (BufferOverflowException e) {
+        logger.info("Raft log buffer overflow!");
+        logBuffer.reset();
+        flushLogBuffer();
+        logBuffer.put(log.serialize());
+      }
+      bufferedLogNum++;
+    }
+  }
+
+  /**
+   * Flush current log buffer to the disk.
+   */
+  private void flushLogBuffer(){
+    lock.writeLock().lock();
+    try{
+      if(bufferedLogNum == 0){
+        return;
+      }
+      // write into disk
+      try {
+        checkStream();
+        ReadWriteIOUtils.writeWithoutSize(logBuffer, currentLogOutputStream);
+      } catch (IOException e) {
+        logger.error("Error in logs serialization: ", e);
+        return;
+      }
+      logBuffer.clear();
+      bufferedLogNum = 0;
+      logger.debug("End flushing log buffer.");
+    }finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
@@ -325,26 +393,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return logFile;
   }
 
-  public void append(Log log) {
-    ByteBuffer data = log.serialize();
-    int totalSize = 0;
-    // write into disk
-    try {
-      checkStream();
-      totalSize = ReadWriteIOUtils.write(data, currentLogOutputStream);
-    } catch (IOException e) {
-      logger.error("Error in appending log {} ", log, e);
-    }
-
-    lock.writeLock().lock();
-    try {
-      logSizeDeque.addLast(totalSize);
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-  }
-
   public void appendInternal(List<Log> logs) {
     int bufferSize = 0;
     List<ByteBuffer> bufferList = new ArrayList<>(logs.size());