You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/27 14:05:30 UTC
[incubator-iotdb] 01/01: append
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit a66c8f5eb3dcfd9f5fe9283caf591ae42ba1efd7
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 27 22:04:45 2020 +0800
append
---
.../resources/conf/iotdb-cluster.properties | 16 ++++
.../apache/iotdb/cluster/config/ClusterConfig.java | 38 +++++++++
.../iotdb/cluster/config/ClusterDescriptor.java | 17 ++++
.../iotdb/cluster/log/StableEntryManager.java | 3 +-
.../serializable/SyncLogDequeSerializer.java | 92 ++++++++++++++++------
5 files changed, 143 insertions(+), 23 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 57b1e54..fd8059a 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -102,5 +102,21 @@ enable_auto_create_schema=true
# Weak consistency do not synchronize with the leader and simply use the local data
consistency_level=mid
+# is raft log persistence enabled
+is_enable_raft_log_persistence=true
+
+# When a certain amount of raft log is reached, it will be flushed to disk
+# It is possible to lose at most flush_wal_threshold operations
+flush_raft_log_threshold=10000
+
+# The cycle when raft log is periodically forced to be written to disk(in milliseconds)
+# If force_raft_log_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
+# Set this parameter to 0 may slow down the ingestion on slow disk.
+force_raft_log_period_in_ms=10
+
+raft_log_buffer_size=16777216
+
+
+
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index ebdda72..9d6d8b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -99,6 +99,17 @@ public class ClusterConfig {
private boolean enableRaftLogPersistence = true;
+
+ private int flushRaftLogThreshold=10000;
+
+ private int forceRaftLogPeriodInMS=10;
+
+ /**
+ * Size of log buffer. If raft log persistence is enabled and the size of a insert plan
+ * is smaller than this parameter, then the insert plan will be rejected by WAL.
+ */
+ private int raftLogBufferSize = 16 * 1024 * 1024;
+
/**
* consistency level, now three consistency levels are supported: strong, mid and weak. Strong
* consistency means the server will first try to synchronize with the leader to get the newest
@@ -109,6 +120,8 @@ public class ClusterConfig {
*/
private ConsistencyLevel consistencyLevel = ConsistencyLevel.MID_CONSISTENCY;
+
+
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
@@ -300,4 +313,29 @@ public class ClusterConfig {
public void setMaxNumOfLogsInMem(int maxNumOfLogsInMem) {
this.maxNumOfLogsInMem = maxNumOfLogsInMem;
}
+
+ public int getRaftLogBufferSize() {
+ return raftLogBufferSize;
+ }
+
+ public void setRaftLogBufferSize(int raftLogBufferSize) {
+ this.raftLogBufferSize = raftLogBufferSize;
+ }
+
+ public int getFlushRaftLogThreshold() {
+ return flushRaftLogThreshold;
+ }
+
+ public void setFlushRaftLogThreshold(int flushRaftLogThreshold) {
+ this.flushRaftLogThreshold = flushRaftLogThreshold;
+ }
+
+ public int getForceRaftLogPeriodInMS() {
+ return forceRaftLogPeriodInMS;
+ }
+
+ public void setForceRaftLogPeriodInMS(int forceRaftLogPeriodInMS) {
+ this.forceRaftLogPeriodInMS = forceRaftLogPeriodInMS;
+ }
+
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a37d84c..75f65a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -244,6 +244,23 @@ public class ClusterDescriptor {
.getProperty("enable_auto_create_schema",
String.valueOf(config.isEnableAutoCreateSchema()))));
+ config.setEnableRaftLogPersistence(
+ Boolean.parseBoolean(properties.getProperty("is_enable_raft_log_persistence",
+ String.valueOf(config.isEnableRaftLogPersistence()))));
+
+ config.setFlushRaftLogThreshold(Integer.parseInt(properties
+ .getProperty("flush_raft_log_threshold", String.valueOf(config.getFlushRaftLogThreshold())))
+ );
+
+ config.setForceRaftLogPeriodInMS(Integer.parseInt(properties
+ .getProperty("force_raft_log_period_in_ms",
+ String.valueOf(config.getForceRaftLogPeriodInMS())))
+ );
+
+ config.setRaftLogBufferSize(Integer.parseInt(properties
+ .getProperty("raft_log_buffer_size", String.valueOf(config.getRaftLogBufferSize())))
+ );
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 8edfd67..38c6d96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -19,13 +19,14 @@
package org.apache.iotdb.cluster.log;
+import java.io.IOException;
import java.util.List;
public interface StableEntryManager {
List<Log> getAllEntries();
- void append(List<Log> entries);
+ void append(List<Log> entries) throws IOException;
void removeCompactedEntries(long index);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index ef31959..d116a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
@@ -81,6 +82,12 @@ public class SyncLogDequeSerializer implements StableEntryManager {
// version controller
private VersionController versionController;
+ private ByteBuffer logBuffer = ByteBuffer
+ .allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+ private int bufferedLogNum = 0;
+
+
/**
* the lock uses when change the logSizeDeque
*/
@@ -158,13 +165,74 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
@Override
- public void append(List<Log> entries) {
+ public void append(List<Log> entries) throws IOException {
Log entry = entries.get(entries.size() - 1);
meta.setCommitLogIndex(entry.getCurrLogIndex());
meta.setCommitLogTerm(entry.getCurrLogTerm());
meta.setLastLogIndex(entry.getCurrLogIndex());
meta.setLastLogTerm(entry.getCurrLogTerm());
- appendInternal(entries);
+
+ lock.writeLock().lock();
+ try{
+ putLogs(entries);
+ if(bufferedLogNum >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()){
+ flushLogBuffer();
+ }
+
+ }catch(BufferOverflowException e){
+ throw new IOException(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "otherwise, please increase the JVM memory", e
+ );
+ }finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Put each log in entries into local buffer. If the buffer overflows, flush the buffer to the
+ * disk, and try to push the log again.
+ *
+ * @param entries logs to put to buffer
+ */
+ private void putLogs(List<Log> entries) {
+ for (Log log : entries) {
+ logBuffer.mark();
+ try {
+ logBuffer.put(log.serialize());
+ } catch (BufferOverflowException e) {
+ logger.info("Raft log buffer overflow!");
+ logBuffer.reset();
+ flushLogBuffer();
+ logBuffer.put(log.serialize());
+ }
+ bufferedLogNum++;
+ }
+ }
+
+ /**
+ * Flush current log buffer to the disk.
+ */
+ private void flushLogBuffer(){
+ lock.writeLock().lock();
+ try{
+ if(bufferedLogNum == 0){
+ return;
+ }
+ // write into disk
+ try {
+ checkStream();
+ ReadWriteIOUtils.writeWithoutSize(logBuffer, currentLogOutputStream);
+ } catch (IOException e) {
+ logger.error("Error in logs serialization: ", e);
+ return;
+ }
+ logBuffer.clear();
+ bufferedLogNum = 0;
+ logger.debug("End flushing log buffer.");
+ }finally {
+ lock.writeLock().unlock();
+ }
}
@Override
@@ -325,26 +393,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return logFile;
}
- public void append(Log log) {
- ByteBuffer data = log.serialize();
- int totalSize = 0;
- // write into disk
- try {
- checkStream();
- totalSize = ReadWriteIOUtils.write(data, currentLogOutputStream);
- } catch (IOException e) {
- logger.error("Error in appending log {} ", log, e);
- }
-
- lock.writeLock().lock();
- try {
- logSizeDeque.addLast(totalSize);
- } finally {
- lock.writeLock().unlock();
- }
-
- }
-
public void appendInternal(List<Log> logs) {
int bufferSize = 0;
List<ByteBuffer> bufferList = new ArrayList<>(logs.size());