You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/20 03:04:05 UTC
[iotdb] branch native_raft updated: add disk log compression
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 6b0f19c97a add disk log compression
6b0f19c97a is described below
commit 6b0f19c97aea1b5267766e746a196dce622cf14a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 20 11:06:03 2023 +0800
add disk log compression
---
.../consensus/natraft/protocol/RaftMember.java | 2 +-
.../log/appender/SlidingWindowLogAppender.java | 6 +-
.../protocol/log/dispatch/VotingLogList.java | 8 +-
.../protocol/log/manager/RaftLogManager.java | 26 ++-
.../manager/serialization/IndexFileDescriptor.java | 53 +++++
.../serialization/SyncLogDequeSerializer.java | 255 ++++++++++-----------
.../log/sequencing/SynchronousSequencer.java | 2 +-
.../iotdb/consensus/natraft/utils/Timer.java | 14 +-
8 files changed, 209 insertions(+), 157 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index bbbc5a11c4..4aef915e65 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1222,7 +1222,7 @@ public class RaftMember {
e.setCurrLogIndex(lastIndex + 1);
e.setPrevTerm(lastTerm);
- logManager.append(Collections.singletonList(e));
+ logManager.append(Collections.singletonList(e), true);
votingEntry = LogUtils.buildVotingLog(e, this);
setNewNodes(newNodes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index cd84e82df1..6157ab6ea1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -148,7 +148,7 @@ public class SlidingWindowLogAppender implements LogAppender {
}
}
if (success != -1) {
- moveWindowRightward(flushPos);
+ moveWindowRightward(flushPos, logs.get(logs.size() - 1).getCurrLogIndex());
}
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(firstPosPrevIndex);
@@ -156,13 +156,13 @@ public class SlidingWindowLogAppender implements LogAppender {
return success;
}
- private void moveWindowRightward(int step) {
+ private void moveWindowRightward(int step, long newIndex) {
System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
for (int i = 1; i <= step; i++) {
logWindow[windowCapacity - i] = null;
}
- firstPosPrevIndex = logManager.getLastLogIndex();
+ firstPosPrevIndex = newIndex;
}
private void moveWindowLeftward(int step) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 15f845abbe..d19c6d33d3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -48,11 +48,11 @@ public class VotingLogList {
private boolean tryCommit(VotingEntry entry) {
RaftLogManager logManager = member.getLogManager();
-
- if (computeNewCommitIndex(entry)
- && logManager != null
- && newCommitIndex.get() > logManager.getCommitLogIndex()) {
+ boolean commitIndexUpdated = computeNewCommitIndex(entry);
+ if (commitIndexUpdated && newCommitIndex.get() > logManager.getCommitLogIndex()) {
try {
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_BEFORE_COMMIT.add(
+ System.nanoTime() - entry.getEntry().createTime);
logManager.commitTo(newCommitIndex.get());
} catch (LogExecutionException e) {
logger.error("Fail to commit {}", newCommitIndex, e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 2c3ffb488e..5f2c4f6f04 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -360,7 +360,7 @@ public abstract class RaftLogManager {
} else {
long offset = lastIndex + 1;
- append(entries.subList((int) (ci - offset), entries.size()));
+ append(entries.subList((int) (ci - offset), entries.size()), false);
}
return newLastIndex;
}
@@ -377,7 +377,7 @@ public abstract class RaftLogManager {
* @param appendingEntries appendingEntries
* @return the newly generated lastIndex
*/
- public long append(List<Entry> appendingEntries) {
+ public long append(List<Entry> appendingEntries, boolean isLeader) {
if (entries.isEmpty()) {
return getLastLogIndex();
}
@@ -410,11 +410,16 @@ public abstract class RaftLogManager {
}
}
- Object logUpdateCondition =
- getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
- synchronized (logUpdateCondition) {
- logUpdateCondition.notifyAll();
+ if (!isLeader) {
+ // log update condition is to inform follower appending threads that the log is updated, and
+ // the leader does not concern it
+ Object logUpdateCondition =
+ getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
+ synchronized (logUpdateCondition) {
+ logUpdateCondition.notifyAll();
+ }
}
+
return getLastLogIndex();
}
@@ -425,7 +430,7 @@ public abstract class RaftLogManager {
* @param term the entry's term which index is leaderCommit in leader's log module
* @return true or false
*/
- public synchronized boolean maybeCommit(long leaderCommit, long term) {
+ public boolean maybeCommit(long leaderCommit, long term) {
if (leaderCommit > commitIndex && matchTerm(term, leaderCommit)) {
try {
commitTo(leaderCommit);
@@ -615,7 +620,12 @@ public abstract class RaftLogManager {
return;
}
- removedCommitted(entries);
+ for (Entry entry : entries) {
+ if (entry.createTime != 0) {
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT.add(
+ System.nanoTime() - entry.createTime);
+ }
+ }
checkCompaction(entries);
commitEntries(entries);
applyEntries(entries);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java
new file mode 100644
index 0000000000..3ec30ce1f9
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.FILE_NAME_PART_LENGTH;
+
+public class IndexFileDescriptor {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndexFileDescriptor.class);
+ File file;
+ long startIndex;
+ long endIndex;
+
+ public IndexFileDescriptor(File file) {
+ this.file = file;
+ String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+ if (splits.length != FILE_NAME_PART_LENGTH) {
+ logger.error(
+ "file={} name should be in the following format: startLogIndex-endLogIndex-version-idx",
+ file.getAbsoluteFile());
+ }
+ this.startIndex = Long.parseLong(splits[0]);
+ this.endIndex = Long.parseLong(splits[1]);
+ }
+
+ public IndexFileDescriptor(File file, long startIndex, long endIndex) {
+ this.file = file;
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index d330f4421e..e348541ce2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -21,13 +21,15 @@ package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.HardState;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -73,7 +75,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private List<File> logDataFileList;
/** the log index files */
- private List<File> logIndexFileList;
+ private List<IndexFileDescriptor> logIndexFileList;
private LogParser parser = LogParser.getINSTANCE();
private File metaFile;
@@ -109,7 +111,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
*
* <p>for log index file: ${startLogIndex}-${endLogIndex}-{version}-idx
*/
- private static final int FILE_NAME_PART_LENGTH = 4;
+ static final int FILE_NAME_PART_LENGTH = 4;
private int maxRaftLogIndexSizeInMemory;
@@ -130,11 +132,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
*/
private long firstLogIndex = 0;
+ private long lastLogIndex = 0;
+
/**
- * the offset of the log's index, for example, the first value is the offset of index
- * ${firstLogIndex}, the second value is the offset of index ${firstLogIndex+1}
+ * the index and file offset of the log, for example, the first pair is the offset of index
+ * ${firstLogIndex}, the second pair is the offset of index ${firstLogIndex+1}
*/
- private List<Long> logIndexOffsetList;
+ private List<Pair<Long, Long>> logIndexOffsetList;
private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
@@ -143,6 +147,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private volatile boolean isClosed = false;
private RaftConfig config;
+ private ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+ private IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
private void initCommonProperties() {
logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
@@ -177,18 +183,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
TimeUnit.SECONDS);
}
- /**
- * for log tools
- *
- * @param logPath log dir path
- */
- public SyncLogDequeSerializer(String logPath, RaftConfig config) {
- this.config = config;
- logDir = logPath + File.separator;
- initCommonProperties();
- initMetaAndLogFiles();
- }
-
/**
* log in disk is [size of log1 | log1 buffer] [size of log2 | log2 buffer]
*
@@ -286,24 +280,17 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logDataBuffer.mark();
logIndexBuffer.mark();
ByteBuffer logData = log.serialize();
- int size = logData.remaining() + Integer.BYTES;
try {
- logDataBuffer.putInt(logData.remaining());
logDataBuffer.put(logData);
- logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
- logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
- offsetOfTheCurrentLogDataOutputStream += size;
+ lastLogIndex = log.getCurrLogIndex();
} catch (BufferOverflowException e) {
- logger.info("Raft log buffer overflow!");
+ logger.debug("Raft log buffer overflow!");
logDataBuffer.reset();
logIndexBuffer.reset();
flushLogBuffer();
checkCloseCurrentFile(log.getCurrLogIndex() - 1);
- logDataBuffer.putInt(logData.capacity());
logDataBuffer.put(logData);
- logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
- logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
- offsetOfTheCurrentLogDataOutputStream += size;
+ lastLogIndex = log.getCurrLogIndex();
}
}
}
@@ -350,26 +337,28 @@ public class SyncLogDequeSerializer implements StableEntryManager {
if (currentLogIndexOutputStream != null) {
currentLogIndexOutputStream.close();
- logger.info("{}: Closed a log index file {}", this, getCurrentLogIndexFile());
+ logger.info("{}: Closed a log index file {}", this, getCurrentLogIndexFileDescriptor());
currentLogIndexOutputStream = null;
- File currentLogIndexFile = getCurrentLogIndexFile();
+ IndexFileDescriptor currentLogIndexFile = getCurrentLogIndexFileDescriptor();
String newIndexFileName =
currentLogIndexFile
+ .file
.getName()
.replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(commitIndex));
File newCurrentLogIndexFile =
SystemFileFactory.INSTANCE.getFile(
- currentLogIndexFile.getParent() + File.separator + newIndexFileName);
- if (!currentLogIndexFile.renameTo(newCurrentLogIndexFile)) {
- logger.error("rename log index file={} failed", currentLogIndexFile.getAbsoluteFile());
+ currentLogIndexFile.file.getParent() + File.separator + newIndexFileName);
+ if (!currentLogIndexFile.file.renameTo(newCurrentLogIndexFile)) {
+ logger.error("rename log index file={} failed", currentLogIndexFile.file.getAbsoluteFile());
}
logger.debug(
"rename index file={} to file={}",
- currentLogIndexFile.getAbsoluteFile(),
+ currentLogIndexFile.file.getAbsoluteFile(),
newCurrentLogIndexFile.getAbsoluteFile());
- logIndexFileList.set(logIndexFileList.size() - 1, newCurrentLogIndexFile);
+ logIndexFileList.get(logIndexFileList.size() - 1).file = newCurrentLogIndexFile;
+ logIndexFileList.get(logIndexFileList.size() - 1).endIndex = commitIndex;
}
offsetOfTheCurrentLogDataOutputStream = 0;
@@ -386,10 +375,19 @@ public class SyncLogDequeSerializer implements StableEntryManager {
try {
checkStream();
// 1. write to the log data file
- ReadWriteIOUtils.writeWithoutSize(
- logDataBuffer, 0, logDataBuffer.position(), currentLogDataOutputStream);
+ byte[] compressed =
+ compressor.compress(
+ logDataBuffer.array(),
+ logDataBuffer.arrayOffset() + logDataBuffer.position(),
+ logDataBuffer.remaining());
+ ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+ currentLogDataOutputStream.write(compressed);
+ logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
+ logIndexBuffer.putLong(lastLogIndex);
+ logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
ReadWriteIOUtils.writeWithoutSize(
logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
+ offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
if (config.getFlushRaftLogThreshold() == 0) {
currentLogDataOutputStream.getChannel().force(true);
currentLogIndexOutputStream.getChannel().force(true);
@@ -483,7 +481,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
// sort by name before recover
logDataFileList.sort(this::comparePersistLogFileName);
- logIndexFileList.sort(this::comparePersistLogFileName);
+ logIndexFileList.sort(
+ (descriptor1, descriptor2) ->
+ comparePersistLogFileName(descriptor1.file, descriptor2.file));
// 3. recover the last log file in case of abnormal exit
recoverTheLastLogFile();
@@ -506,7 +506,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logDataFileList.add(file);
break;
case LOG_INDEX_FILE_SUFFIX:
- logIndexFileList.add(file);
+ logIndexFileList.add(new IndexFileDescriptor(file));
break;
default:
logger.error("unknown file type={}", logFileType);
@@ -565,19 +565,21 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return;
}
- File lastIndexFile = logIndexFileList.get(logIndexFileList.size() - 1);
- long endIndex = Long.parseLong(lastIndexFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+ IndexFileDescriptor lastIndexFileDescriptor = logIndexFileList.get(logIndexFileList.size() - 1);
+ long endIndex = lastIndexFileDescriptor.endIndex;
boolean success = true;
if (endIndex != Long.MAX_VALUE) {
- logger.info("last log index file={} no need to recover", lastIndexFile.getAbsoluteFile());
+ logger.info(
+ "last log index file={} no need to recover",
+ lastIndexFileDescriptor.file.getAbsoluteFile());
} else {
- success = recoverTheLastLogIndexFile(lastIndexFile);
+ success = recoverTheLastLogIndexFile(lastIndexFileDescriptor);
}
if (!success) {
logger.error(
"recover log index file failed, clear all logs in disk, {}",
- lastIndexFile.getAbsoluteFile());
+ lastIndexFileDescriptor.file.getAbsoluteFile());
forceDeleteAllLogFiles();
clearFirstLogIndex();
return;
@@ -603,9 +605,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private boolean recoverTheLastLogDataFile(File file) {
String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
long startIndex = Long.parseLong(splits[0]);
- Pair<File, Pair<Long, Long>> fileStartAndEndIndex = getLogIndexFile(startIndex);
- if (fileStartAndEndIndex.right.left == startIndex) {
- long endIndex = fileStartAndEndIndex.right.right;
+ IndexFileDescriptor descriptor = getLogIndexFile(startIndex);
+ if (descriptor.startIndex == startIndex) {
+ long endIndex = descriptor.endIndex;
String newDataFileName =
file.getName().replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
File newLogDataFile =
@@ -620,30 +622,31 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return false;
}
- private boolean recoverTheLastLogIndexFile(File file) {
- logger.debug("start to recover the last log index file={}", file.getAbsoluteFile());
- String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
- long startIndex = Long.parseLong(splits[0]);
+ private boolean recoverTheLastLogIndexFile(IndexFileDescriptor descriptor) {
+ logger.debug("start to recover the last log index file={}", descriptor.file.getAbsoluteFile());
+ long startIndex = descriptor.startIndex;
int longLength = 8;
byte[] bytes = new byte[longLength];
- int totalCount = 0;
+ long index;
long offset = 0;
- try (FileInputStream fileInputStream = new FileInputStream(file);
+ long endIndex = 0;
+ try (FileInputStream fileInputStream = new FileInputStream(descriptor.file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
firstLogIndex = startIndex;
while (bufferedInputStream.read(bytes) != -1) {
+ index = BytesUtils.bytesToLong(bytes);
+ bufferedInputStream.read(bytes);
offset = BytesUtils.bytesToLong(bytes);
- logIndexOffsetList.add(offset);
- totalCount++;
+ logIndexOffsetList.add(new Pair<>(index, offset));
+ endIndex = index;
}
} catch (IOException e) {
logger.error("recover log index file failed,", e);
}
- long endIndex = startIndex + totalCount - 1;
logger.debug(
"recover log index file={}, startIndex={}, endIndex={}",
- file.getAbsoluteFile(),
+ descriptor.file.getAbsoluteFile(),
startIndex,
endIndex);
@@ -658,15 +661,21 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
if (endIndex >= startIndex) {
String newIndexFileName =
- file.getName().replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
+ descriptor
+ .file
+ .getName()
+ .replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
File newLogIndexFile =
- SystemFileFactory.INSTANCE.getFile(file.getParent() + File.separator + newIndexFileName);
- if (!file.renameTo(newLogIndexFile)) {
- logger.error("rename log index file={} failed when recover", file.getAbsoluteFile());
+ SystemFileFactory.INSTANCE.getFile(
+ descriptor.file.getParent() + File.separator + newIndexFileName);
+ if (!descriptor.file.renameTo(newLogIndexFile)) {
+ logger.error(
+ "rename log index file={} failed when recover", descriptor.file.getAbsoluteFile());
}
- logIndexFileList.set(logIndexFileList.size() - 1, newLogIndexFile);
+ descriptor.file = newLogIndexFile;
+ descriptor.endIndex = endIndex;
} else {
- logger.error("recover log index file failed,{}", file.getAbsoluteFile());
+ logger.error("recover log index file failed,{}", descriptor.file.getAbsoluteFile());
return false;
}
return true;
@@ -733,8 +742,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
if (currentLogIndexOutputStream == null) {
- currentLogIndexOutputStream = new FileOutputStream(getCurrentLogIndexFile(), true);
- logger.info("{}: Opened a new index data file: {}", this, getCurrentLogIndexFile());
+ currentLogIndexOutputStream =
+ new FileOutputStream(getCurrentLogIndexFileDescriptor().file, true);
+ logger.info("{}: Opened a new index data file: {}", this, getCurrentLogIndexFileDescriptor());
}
}
@@ -766,7 +776,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logger.warn("Cannot create new log index file {}", logDataFile);
}
logDataFileList.add(logDataFile);
- logIndexFileList.add(logIndexFile);
+ logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
} finally {
lock.unlock();
}
@@ -776,7 +786,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return logDataFileList.get(logDataFileList.size() - 1);
}
- private File getCurrentLogIndexFile() {
+ private IndexFileDescriptor getCurrentLogIndexFileDescriptor() {
return logIndexFileList.get(logIndexFileList.size() - 1);
}
@@ -1025,7 +1035,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
File logDataFile = null;
- File logIndexFile = null;
+ IndexFileDescriptor logIndexFile = null;
lock.lock();
try {
@@ -1036,18 +1046,18 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return false;
}
Files.delete(logDataFile.toPath());
- Files.delete(logIndexFile.toPath());
+ Files.delete(logIndexFile.file.toPath());
logDataFileList.remove(0);
logIndexFileList.remove(0);
logger.debug(
"delete date file={}, index file={}",
logDataFile.getAbsoluteFile(),
- logIndexFile.getAbsoluteFile());
+ logIndexFile.file.getAbsoluteFile());
} catch (IOException e) {
logger.error(
"delete file failed, data file={}, index file={}",
logDataFile.getAbsoluteFile(),
- logIndexFile.getAbsoluteFile());
+ logIndexFile.file.getAbsoluteFile());
return false;
} finally {
lock.unlock();
@@ -1095,9 +1105,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
startIndex = 0;
}
- long newEndIndex = endIndex;
+ long newEndIndex;
if (endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
newEndIndex = startIndex + maxNumberOfLogsPerFetchOnDisk;
+ } else {
+ newEndIndex = endIndex;
}
logger.debug(
"intend to get logs between[{}, {}], actually get logs between[{},{}]",
@@ -1122,6 +1134,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
for (Pair<File, Pair<Long, Long>> pair : logDataFileAndOffsetList) {
result.addAll(getLogsFromOneLogDataFile(pair.left, pair.right));
}
+ long finalStartIndex = startIndex;
+ result.removeIf(
+ e -> !(finalStartIndex <= e.getCurrLogIndex() && e.getCurrLogIndex() <= newEndIndex));
return result;
} finally {
@@ -1147,16 +1162,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
// 1. first find in memory
if (logIndex >= firstLogIndex) {
- int arrayIndex = (int) (logIndex - firstLogIndex);
- if (arrayIndex < logIndexOffsetList.size()) {
- offset = logIndexOffsetList.get(arrayIndex);
- logger.debug(
- "found the offset in memory, logIndex={}, firstLogIndex={}, logIndexOffsetList size={}, offset={}",
- logIndex,
- firstLogIndex,
- logIndexOffsetList.size(),
- offset);
- return offset;
+ for (Pair<Long, Long> indexOffset : logIndexOffsetList) {
+ // end index
+ if (indexOffset.left >= logIndex) {
+ return indexOffset.right;
+ }
}
}
@@ -1167,36 +1177,30 @@ public class SyncLogDequeSerializer implements StableEntryManager {
logIndexOffsetList.size());
// 2. second read the log index file
- Pair<File, Pair<Long, Long>> fileWithStartAndEndIndex = getLogIndexFile(logIndex);
- if (fileWithStartAndEndIndex == null) {
+ IndexFileDescriptor descriptor = getLogIndexFile(logIndex);
+ if (descriptor == null) {
return -1;
}
- File file = fileWithStartAndEndIndex.left;
- Pair<Long, Long> startAndEndIndex = fileWithStartAndEndIndex.right;
+ File file = descriptor.file;
logger.debug(
"start to read the log index file={} for log index={}, file size={}",
file.getAbsoluteFile(),
logIndex,
file.length());
+ long endIndex;
try (FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
- long bytesNeedToSkip = (logIndex - startAndEndIndex.left) * (Long.BYTES);
- long bytesActuallySkip = bufferedInputStream.skip(bytesNeedToSkip);
- logger.debug("skip {} bytes when read file={}", bytesActuallySkip, file.getAbsoluteFile());
- if (bytesNeedToSkip != bytesActuallySkip) {
- logger.error(
- "read file={} failed, should skip={}, actually skip={}",
- file.getAbsoluteFile(),
- bytesNeedToSkip,
- bytesActuallySkip);
- return -1;
+ while (bufferedInputStream.available() > 0) {
+ endIndex = ReadWriteIOUtils.readLong(bufferedInputStream);
+ offset = ReadWriteIOUtils.readLong(bufferedInputStream);
+ if (endIndex >= logIndex) {
+ return offset;
+ }
}
- offset = ReadWriteIOUtils.readLong(bufferedInputStream);
- return offset;
} catch (IOException e) {
logger.error("can not read the log index file={}", file.getAbsoluteFile(), e);
- return -1;
}
+ return -1;
}
/**
@@ -1270,16 +1274,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* second pair's first value is the file's start log index. the second pair's second value is
* the file's end log index. null if not found
*/
- public Pair<File, Pair<Long, Long>> getLogIndexFile(long startIndex) {
- for (File file : logIndexFileList) {
- String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
- if (splits.length != FILE_NAME_PART_LENGTH) {
- logger.error(
- "file={} name should be in the following format: startLogIndex-endLogIndex-version-idx",
- file.getAbsoluteFile());
- }
- if (Long.parseLong(splits[0]) <= startIndex && startIndex <= Long.parseLong(splits[1])) {
- return new Pair<>(file, new Pair<>(Long.parseLong(splits[0]), Long.parseLong(splits[1])));
+ public IndexFileDescriptor getLogIndexFile(long startIndex) {
+ for (IndexFileDescriptor descriptor : logIndexFileList) {
+ if (descriptor.startIndex <= startIndex && startIndex <= descriptor.endIndex) {
+ return descriptor;
}
}
logger.debug("can not found the log index file for startIndex={}", startIndex);
@@ -1351,11 +1349,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
file.getAbsoluteFile(),
currentReadOffset,
startAndEndOffset.right);
- int logSize = ReadWriteIOUtils.readInt(bufferedInputStream);
- Entry e = null;
- e = parser.parse(ByteBuffer.wrap(ReadWriteIOUtils.readBytes(bufferedInputStream, logSize)));
- result.add(e);
- currentReadOffset = currentReadOffset + Integer.BYTES + logSize;
+ int logBlockSize = ReadWriteIOUtils.readInt(bufferedInputStream);
+ byte[] bytes = ReadWriteIOUtils.readBytes(bufferedInputStream, logBlockSize);
+ ByteBuffer uncompressed = ByteBuffer.wrap(unCompressor.uncompress(bytes));
+ while (uncompressed.remaining() > 0) {
+ Entry e = parser.parse(uncompressed);
+ result.add(e);
+ }
+ currentReadOffset = currentReadOffset + Integer.BYTES + logBlockSize;
}
} catch (UnknownLogTypeException e) {
logger.error("Unknown log detected ", e);
@@ -1365,31 +1366,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return result;
}
- @TestOnly
- public void setLogDataBuffer(ByteBuffer logDataBuffer) {
- this.logDataBuffer = logDataBuffer;
- }
-
- @TestOnly
- public void setMaxRaftLogPersistDataSizePerFile(int maxRaftLogPersistDataSizePerFile) {
- this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
- }
-
- @TestOnly
- public void setMaxNumberOfPersistRaftLogFiles(int maxNumberOfPersistRaftLogFiles) {
- this.maxNumberOfPersistRaftLogFiles = maxNumberOfPersistRaftLogFiles;
- }
-
- @TestOnly
- public List<File> getLogDataFileList() {
- return logDataFileList;
- }
-
- @TestOnly
- public List<File> getLogIndexFileList() {
- return logIndexFileList;
- }
-
/**
* VersionController manages the version(a monotonically increasing long) of a storage group. We
* define that each memtable flush, data deletion, or data update will generate a new version of
@@ -1399,6 +1375,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* be guaranteed by the caller.
*/
public interface VersionController {
+
/**
* Get the next version number.
*
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 97df15c2aa..7c59049bad 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -70,7 +70,7 @@ public class SynchronousSequencer implements LogSequencer {
e.createTime = System.nanoTime();
// logDispatcher will serialize log, and set log size, and we will use the size after it
- logManager.append(Collections.singletonList(e));
+ logManager.append(Collections.singletonList(e), true);
votingEntry = LogUtils.buildVotingLog(e, member);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 619bde93d6..3a35b9dab6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -284,9 +284,21 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_BEFORE_COMMIT(
+ LOG_DISPATCHER,
+ "from create to before commit",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT(
+ LOG_DISPATCHER,
+ "from create to ready commit",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
LOG_DISPATCHER,
- "from create to commit",
+ "from create to committed",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),