You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/29 13:19:38 UTC
[incubator-iotdb] 01/01: flush periodically
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4a77450fc701d50a988b4d4dc9c197a6e0f0cac5
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jul 29 21:19:04 2020 +0800
flush periodically
---
.../iotdb/cluster/log/StableEntryManager.java | 5 +++++
.../iotdb/cluster/log/manage/RaftLogManager.java | 21 ++++++++++++++++---
.../serializable/SyncLogDequeSerializer.java | 24 +++++++++++++++++-----
3 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
index 38c6d96..164bfc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/StableEntryManager.java
@@ -28,6 +28,11 @@ public interface StableEntryManager {
void append(List<Log> entries) throws IOException;
+
+ void flushLogBuffer();
+
+ void forceFlushLogBuffer();
+
void removeCompactedEntries(long index);
void setHardStateAndFlush(HardState state);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index d998f7d..748a261 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,16 @@ abstract public class RaftLogManager {
.scheduleAtFixedRate(this::checkDeleteLog, logDeleteCheckIntervalSecond,
logDeleteCheckIntervalSecond,
TimeUnit.SECONDS);
+
+ /**
+ * flush log to file periodically
+ */
+ int logFlushTimeIntervalMS = ClusterDescriptor.getInstance().getConfig()
+ .getForceRaftLogPeriodInMS();
+ if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+ executorService.scheduleAtFixedRate(this::flushLogPeriodically, logFlushTimeIntervalMS,
+ logFlushTimeIntervalMS, TimeUnit.MILLISECONDS);
+ }
}
abstract public Snapshot getSnapshot();
@@ -160,8 +170,7 @@ abstract public class RaftLogManager {
*
* @param index request entry index
* @return throw EntryCompactedException if index < dummyIndex, -1 if index > lastIndex or the
- * entry is compacted, otherwise
- * return the entry's term for given index
+ * entry is compacted, otherwise return the entry's term for given index
* @throws EntryCompactedException
*/
public long getTerm(long index) throws EntryCompactedException {
@@ -431,7 +440,7 @@ abstract public class RaftLogManager {
applyEntries(entries, ignoreExecutionExceptions);
} catch (TruncateCommittedEntryException e) {
logger.error("{}: Unexpected error:", name, e);
- } catch (IOException e){
+ } catch (IOException e) {
throw new LogExecutionException(e);
}
}
@@ -589,6 +598,12 @@ abstract public class RaftLogManager {
}
}
+ public void flushLogPeriodically() {
+ synchronized (this) {
+ getStableEntryManager().flushLogBuffer();
+ }
+ }
+
private void innerDeleteLog() {
long removeSize = committedEntryManager.getTotalSize() - minNumOfLogsInMem;
long compactIndex = committedEntryManager.getDummyIndex() + removeSize;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index 6a3d529..61306b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -216,16 +216,16 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /**
- * Flush current log buffer to the disk.
- */
- private void flushLogBuffer() {
+ @Override
+ public void flushLogBuffer() {
+
lock.writeLock().lock();
try {
if (bufferedLogNum == 0) {
return;
}
// write into disk
+ System.out.println(">>>>>>>>>>>>>>>>>>> flush log");
try {
checkStream();
ReadWriteIOUtils
@@ -243,6 +243,20 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
@Override
+ public void forceFlushLogBuffer() {
+ flushLogBuffer();
+ lock.writeLock().lock();
+ try {
+ currentLogOutputStream.getChannel().force(true);
+ } catch (IOException e) {
+ logger.error("Error when force flushing logs serialization: ", e);
+ return;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
public void setHardStateAndFlush(HardState state) {
this.state = state;
serializeMeta(meta);
@@ -616,7 +630,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
@Override
public void close() {
- flushLogBuffer();
+ forceFlushLogBuffer();
lock.writeLock().lock();
try {
if (currentLogOutputStream != null) {