You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/29 02:23:30 UTC
[incubator-iotdb] branch kyy updated: rename flushRaftLogThreshold
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/kyy by this push:
new 376b177 rename flushRaftLogThreshold
376b177 is described below
commit 376b1774c8e41c0e1b69431c35675392e30f2da7
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jul 29 10:23:14 2020 +0800
rename flushRaftLogThreshold
---
.../serializable/SyncLogDequeSerializer.java | 30 ++++++++++++----------
1 file changed, 16 insertions(+), 14 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index b77b33d..b54e07b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -85,6 +85,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private ByteBuffer logBuffer = ByteBuffer
.allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+ private int flushRaftLogThreshold = ClusterDescriptor.getInstance().getConfig()
+ .getFlushRaftLogThreshold();
+
private int bufferedLogNum = 0;
@@ -171,27 +174,25 @@ public class SyncLogDequeSerializer implements StableEntryManager {
meta.setCommitLogTerm(entry.getCurrLogTerm());
meta.setLastLogIndex(entry.getCurrLogIndex());
meta.setLastLogTerm(entry.getCurrLogTerm());
-
lock.writeLock().lock();
- try{
+ try {
putLogs(entries);
- if(bufferedLogNum >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()){
+ if (bufferedLogNum >= flushRaftLogThreshold) {
flushLogBuffer();
}
-
- }catch(BufferOverflowException e){
+ } catch (BufferOverflowException e) {
throw new IOException(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "otherwise, please increase the JVM memory", e
);
- }finally {
+ } finally {
lock.writeLock().unlock();
}
}
/**
- * Put each log in entries into local buffer. If the buffer overflows, flush the buffer to the
- * disk, and try to push the log again.
+ * Put each log in entries to local buffer. If the buffer overflows, flush the buffer to the disk,
+ * and try to push the log again.
*
* @param entries logs to put to buffer
*/
@@ -218,16 +219,17 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* Flush current log buffer to the disk.
*/
- private void flushLogBuffer(){
+ private void flushLogBuffer() {
lock.writeLock().lock();
- try{
- if(bufferedLogNum == 0){
+ try {
+ if (bufferedLogNum == 0) {
return;
}
// write into disk
try {
checkStream();
- ReadWriteIOUtils.writeWithoutSize(logBuffer, 0, logBuffer.position(), currentLogOutputStream);
+ ReadWriteIOUtils
+ .writeWithoutSize(logBuffer, 0, logBuffer.position(), currentLogOutputStream);
} catch (IOException e) {
logger.error("Error in logs serialization: ", e);
return;
@@ -235,7 +237,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logBuffer.clear();
bufferedLogNum = 0;
logger.debug("End flushing log buffer.");
- }finally {
+ } finally {
lock.writeLock().unlock();
}
}
@@ -495,7 +497,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
public void removeFirst(int num) {
- if(bufferedLogNum > 0){
+ if (bufferedLogNum > 0) {
flushLogBuffer();
}
firstLogPosition += num;