You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/19 15:47:36 UTC
[iotdb] branch master updated: [IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c39d956b1 [IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)
1c39d956b1 is described below
commit 1c39d956b113c690f1d9e36598f55a704b7f2383
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Jul 19 23:47:30 2022 +0800
[IOTDB-3791] Avoid redundant serialization and deserialization in multi-leader (#6655)
---
.../common/request/IndexedConsensusRequest.java | 21 +-
.../request/MultiLeaderConsensusRequest.java | 34 +-
.../iotdb/consensus/config/MultiLeaderConfig.java | 2 +-
.../multileader/MultiLeaderServerImpl.java | 9 +-
.../multileader/logdispatcher/LogDispatcher.java | 9 +-
.../service/MultiLeaderRPCServiceProcessor.java | 9 +-
.../multileader/wal/ConsensusReqReader.java | 20 --
.../multileader/util/FakeConsensusReqReader.java | 19 --
.../consensus/multileader/util/TestEntry.java | 8 +-
.../multileader/util/TestStateMachine.java | 29 +-
.../consensus/statemachine/BaseStateMachine.java | 4 +
.../statemachine/DataRegionStateMachine.java | 21 +-
.../mpp/plan/planner/plan/node/PlanNodeType.java | 22 +-
.../plan/planner/plan/node/write/InsertNode.java | 7 +
.../planner/plan/node/write/InsertRowNode.java | 84 ++++-
.../planner/plan/node/write/InsertTabletNode.java | 56 +++-
.../java/org/apache/iotdb/db/tools/WalChecker.java | 27 +-
.../iotdb/db/wal/buffer/IWALByteBufferView.java | 3 +
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 98 +++---
.../org/apache/iotdb/db/wal/buffer/WALEntry.java | 123 +++----
.../apache/iotdb/db/wal/buffer/WALEntryType.java | 14 +-
.../apache/iotdb/db/wal/buffer/WALEntryValue.java | 5 +-
.../apache/iotdb/db/wal/buffer/WALInfoEntry.java | 107 ++++++
.../{SignalWALEntry.java => WALSignalEntry.java} | 47 ++-
.../java/org/apache/iotdb/db/wal/io/LogWriter.java | 9 +-
.../apache/iotdb/db/wal/io/WALByteBufReader.java | 95 ++++++
.../org/apache/iotdb/db/wal/io/WALMetaData.java | 96 ++++++
.../java/org/apache/iotdb/db/wal/io/WALReader.java | 46 +--
.../java/org/apache/iotdb/db/wal/io/WALWriter.java | 44 +++
.../org/apache/iotdb/db/wal/node/WALFakeNode.java | 13 -
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 319 ++++--------------
.../iotdb/db/wal/recover/WALNodeRecoverTask.java | 19 +-
.../iotdb/db/wal/recover/WALRecoverWriter.java | 63 ++++
.../plan/node/write/InsertRowNodeSerdeTest.java | 2 +-
.../plan/node/write/InsertTabletNodeSerdeTest.java | 2 +-
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 15 +-
.../iotdb/db/wal/buffer/WALBufferCommonTest.java | 2 +-
.../org/apache/iotdb/db/wal/io/WALFileTest.java | 21 +-
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 367 ++++++++++-----------
.../db/wal/recover/WALRecoverManagerTest.java | 11 +-
.../file/UnsealedTsFileRecoverPerformerTest.java | 9 +-
.../iotdb/db/wal/utils/WALByteBufferForTest.java | 5 +
.../src/main/thrift/mutlileader.thrift | 1 +
43 files changed, 1121 insertions(+), 796 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 0eaade43f5..de7ac07250 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.common.request;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Objects;
/** only used for multi-leader consensus. */
@@ -32,26 +33,26 @@ public class IndexedConsensusRequest implements IConsensusRequest {
private final long safelyDeletedSearchIndex;
- private final IConsensusRequest request;
+ private final List<IConsensusRequest> requests;
- public IndexedConsensusRequest(long searchIndex, IConsensusRequest request) {
- this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, request);
+ public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
+ this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, requests);
}
public IndexedConsensusRequest(
- long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest request) {
+ long searchIndex, long safelyDeletedSearchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
- this.request = request;
+ this.requests = requests;
}
@Override
public ByteBuffer serializeToByteBuffer() {
- return request.serializeToByteBuffer();
+ throw new UnsupportedOperationException();
}
- public IConsensusRequest getRequest() {
- return request;
+ public List<IConsensusRequest> getRequests() {
+ return requests;
}
public long getSearchIndex() {
@@ -73,11 +74,11 @@ public class IndexedConsensusRequest implements IConsensusRequest {
IndexedConsensusRequest that = (IndexedConsensusRequest) o;
return searchIndex == that.searchIndex
&& safelyDeletedSearchIndex == that.safelyDeletedSearchIndex
- && Objects.equals(request, that.request);
+ && requests.equals(that.requests);
}
@Override
public int hashCode() {
- return Objects.hash(searchIndex, safelyDeletedSearchIndex, request);
+ return Objects.hash(searchIndex, safelyDeletedSearchIndex, requests);
}
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
similarity index 53%
copy from thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
index 5e2836c5a6..10c761743d 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/MultiLeaderConsensusRequest.java
@@ -17,22 +17,26 @@
* under the License.
*/
-include "common.thrift"
-namespace java org.apache.iotdb.consensus.multileader.thrift
+package org.apache.iotdb.consensus.common.request;
-struct TLogBatch {
- 1: required binary data
-}
+import java.nio.ByteBuffer;
-struct TSyncLogReq {
- 1: required common.TConsensusGroupId consensusGroupId
- 2: required list<TLogBatch> batches
-}
+/**
+ * This class is used to represent the sync log request from MultiLeaderConsensus. That we use this
+ * class rather than ByteBufferConsensusRequest is because the serialization method is different
+ * between these two classes. And we need to separate them in DataRegionStateMachine when
+ * deserialize the PlanNode from ByteBuffer
+ */
+public class MultiLeaderConsensusRequest implements IConsensusRequest {
-struct TSyncLogRes {
- 1: required list<common.TSStatus> status
-}
+ private final ByteBuffer byteBuffer;
+
+ public MultiLeaderConsensusRequest(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
-service MultiLeaderConsensusIService {
- TSyncLogRes syncLog(TSyncLogReq req)
-}
\ No newline at end of file
+ @Override
+ public ByteBuffer serializeToByteBuffer() {
+ return byteBuffer;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 06cc2e453c..e4f66f557b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
public static class Builder {
private int maxPendingRequestNumPerNode = 200;
private int maxRequestPerBatch = 40;
- private int maxPendingBatch = 6;
+ private int maxPendingBatch = 1;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 2c21beba3e..a0439bf8c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
@@ -45,6 +44,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -181,12 +181,13 @@ public class MultiLeaderServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
- return new IndexedConsensusRequest(index.incrementAndGet(), request);
+ return new IndexedConsensusRequest(index.incrementAndGet(), Collections.singletonList(request));
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- ByteBufferConsensusRequest request) {
- return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, request);
+ IConsensusRequest request) {
+ return new IndexedConsensusRequest(
+ ConsensusReqReader.DEFAULT_SEARCH_INDEX, Collections.singletonList(request));
}
/**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0a65148fe3..8ab54482f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -337,7 +338,9 @@ public class LogDispatcher {
IndexedConsensusRequest data = walEntryiterator.next();
currentIndex = data.getSearchIndex();
iteratorIndex = currentIndex;
- logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ for (IConsensusRequest innerRequest : data.getRequests()) {
+ logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), true));
+ }
if (currentIndex == maxIndex - 1) {
break;
}
@@ -347,7 +350,9 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
- logBatches.add(new TLogBatch(request.serializeToByteBuffer()));
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), false));
+ }
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 196bf120ff..b83c900892 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
@@ -30,7 +31,6 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +50,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
}
@Override
- public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler)
- throws TException {
+ public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
try {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -75,7 +74,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
impl.getStateMachine()
.write(
impl.buildIndexedConsensusRequestForRemoteRequest(
- new ByteBufferConsensusRequest(batch.data))));
+ batch.isFromWAL()
+ ? new MultiLeaderConsensusRequest(batch.data)
+ : new ByteBufferConsensusRequest(batch.data))));
}
}
logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index ea462924e4..b3ad6ec09f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -19,11 +19,9 @@
package org.apache.iotdb.consensus.multileader.wal;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,24 +36,6 @@ public interface ConsensusReqReader {
void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex);
- /**
- * Gets the consensus request at the specified position.
- *
- * @param index index of the consensus request to return
- * @return the consensus request at the specified position, null if the request doesn't exist
- */
- IConsensusRequest getReq(long index);
-
- /**
- * Gets the consensus requests from the specified start position.
- *
- * @param startIndex index of the start consensus request
- * @param num number of consensus requests to return, the number of actual returned consensus
- * requests may less than this value
- * @return the consensus requests from the specified start position
- */
- List<IConsensusRequest> getReqs(long startIndex, int num);
-
/**
* Gets the consensus requests iterator from the specified start position.
*
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
index a7f7134940..334d5a4fc6 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -20,11 +20,9 @@
package org.apache.iotdb.consensus.multileader.util;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,23 +37,6 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
@Override
public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {}
- @Override
- public IConsensusRequest getReq(long index) {
- synchronized (requestSets) {
- for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
- if (indexedConsensusRequest.getSearchIndex() == index) {
- return indexedConsensusRequest;
- }
- }
- return null;
- }
- }
-
- @Override
- public List<IConsensusRequest> getReqs(long startIndex, int num) {
- return null;
- }
-
@Override
public ReqIterator getReqIterator(long startIndex) {
return new FakeConsensusReqIterator(startIndex);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
index 214af5a8d6..874aa9b7f1 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.consensus.multileader.util;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -28,14 +28,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class TestEntry implements IConsensusRequest {
+public class TestEntry extends MultiLeaderConsensusRequest {
private final int num;
private final Peer peer;
public TestEntry(int num, Peer peer) {
+ super(ByteBuffer.allocate(Integer.BYTES));
this.num = num;
this.peer = peer;
+ ByteBuffer buffer = super.serializeToByteBuffer();
+ buffer.putInt(num);
+ buffer.clear();
}
@Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index 1a9507da07..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -23,16 +23,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +48,14 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
public Set<TestEntry> getData() {
Set<TestEntry> data = new HashSet<>();
- requestSets.getRequestSet().forEach(x -> data.add((TestEntry) x.getRequest()));
+ requestSets
+ .getRequestSet()
+ .forEach(
+ x -> {
+ for (IConsensusRequest request : x.getRequests()) {
+ data.add((TestEntry) request);
+ }
+ });
return data;
}
@@ -59,17 +68,15 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
@Override
public TSStatus write(IConsensusRequest request) {
synchronized (requestSets) {
- IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
- if (innerRequest instanceof ByteBufferConsensusRequest) {
+ IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
+ List<IConsensusRequest> transformedRequest = new ArrayList<>();
+ for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
ByteBuffer buffer = innerRequest.serializeToByteBuffer();
- requestSets.add(
- new IndexedConsensusRequest(
- ((IndexedConsensusRequest) request).getSearchIndex(),
- new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
- false);
- } else {
- requestSets.add(((IndexedConsensusRequest) request), true);
+ transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
}
+ requestSets.add(
+ new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
+ indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 6d8dc5f62b..32333f26e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,9 +22,11 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +52,8 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
PlanNode node;
if (request instanceof ByteBufferConsensusRequest) {
node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+ } else if (request instanceof MultiLeaderConsensusRequest) {
+ node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
} else if (request instanceof PlanNode) {
node = (PlanNode) request;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 8ed5bbb7cd..e5d9c2b6f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -98,19 +98,30 @@ public class DataRegionStateMachine extends BaseStateMachine {
@Override
public TSStatus write(IConsensusRequest request) {
+ TSStatus status;
PlanNode planNode;
try {
if (request instanceof IndexedConsensusRequest) {
+ status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
- planNode = getPlanNode(indexedConsensusRequest.getRequest());
- if (planNode instanceof InsertNode) {
- ((InsertNode) planNode)
- .setSearchIndex(((IndexedConsensusRequest) request).getSearchIndex());
+ for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
+ planNode = getPlanNode(innerRequest);
+ if (planNode instanceof InsertNode) {
+ ((InsertNode) planNode)
+ .setSearchIndex(((IndexedConsensusRequest) request).getSearchIndex());
+ }
+ TSStatus subStatus = write(planNode);
+ if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ status.setCode(subStatus.getCode());
+ status.setMessage(subStatus.getMessage());
+ }
+ status.addToSubStatus(subStatus);
}
} else {
planNode = getPlanNode(request);
+ status = write(planNode);
}
- return write(planNode);
+ return status;
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index d31db09d34..4c176d83b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.plan.node;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesSchemaScanNode;
@@ -127,6 +126,8 @@ public enum PlanNodeType {
NODE_PATHS_COUNT((short) 49),
INTERNAL_CREATE_TIMESERIES((short) 50);
+ public static final int BYTES = Short.BYTES;
+
private final short nodeType;
PlanNodeType(short nodeType) {
@@ -141,14 +142,25 @@ public enum PlanNodeType {
ReadWriteIOUtils.write(nodeType, stream);
}
- public static PlanNode deserialize(DataInputStream stream)
- throws IOException, IllegalPathException {
+ public static PlanNode deserializeFromWAL(DataInputStream stream) throws IOException {
short nodeType = stream.readShort();
switch (nodeType) {
case 13:
- return InsertTabletNode.deserialize(stream);
+ return InsertTabletNode.deserializeFromWAL(stream);
+ case 14:
+ return InsertRowNode.deserializeFromWAL(stream);
+ default:
+ throw new IllegalArgumentException("Invalid node type: " + nodeType);
+ }
+ }
+
+ public static PlanNode deserializeFromWAL(ByteBuffer buffer) {
+ short nodeType = buffer.getShort();
+ switch (nodeType) {
+ case 13:
+ return InsertTabletNode.deserializeFromWAL(buffer);
case 14:
- return InsertRowNode.deserialize(stream);
+ return InsertRowNode.deserializeFromWAL(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 9b4fe72f0c..f5b5bddb52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -203,6 +203,13 @@ public abstract class InsertNode extends WritePlanNode {
measurements[i] = measurementSchemas[i].getMeasurementId();
}
}
+
+ protected void deserializeMeasurementSchemas(ByteBuffer buffer) {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+ }
// endregion
public TRegionReplicaSet getRegionReplicaSet() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index ef03fcd046..883fcc30c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -569,6 +569,10 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return size;
}
+ /**
+ * Compared with {@link this#serialize(ByteBuffer)}, more info: search index, less info:
+ * isNeedInferType
+ */
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
@@ -629,19 +633,22 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
/** Deserialize from wal */
- public static InsertRowNode deserialize(DataInputStream stream)
- throws IOException, IllegalPathException {
+ public static InsertRowNode deserializeFromWAL(DataInputStream stream) throws IOException {
// we do not store plan node id in wal entry
InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
insertNode.setSearchIndex(stream.readLong());
insertNode.setTime(stream.readLong());
- insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
- insertNode.deserializeMeasurementsAndValues(stream);
+ try {
+ insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ insertNode.deserializeMeasurementsAndValuesFromWAL(stream);
return insertNode;
}
- void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
+ void deserializeMeasurementsAndValuesFromWAL(DataInputStream stream) throws IOException {
int measurementSize = stream.readInt();
measurements = new String[measurementSize];
@@ -650,13 +657,13 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
dataTypes = new TSDataType[measurementSize];
values = new Object[measurementSize];
- fillDataTypesAndValues(stream);
+ fillDataTypesAndValuesFromWAL(stream);
isAligned = stream.readByte() == 1;
}
/** Make sure the dataTypes and values have been created before calling this */
- public void fillDataTypesAndValues(DataInputStream stream) throws IOException {
+ public void fillDataTypesAndValuesFromWAL(DataInputStream stream) throws IOException {
for (int i = 0; i < dataTypes.length; i++) {
byte typeNum = stream.readByte();
if (typeNum == TYPE_NULL) {
@@ -687,6 +694,69 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
}
}
+
+ /** Deserialize from wal */
+ public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
+ // we do not store plan node id in wal entry
+ InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+ insertNode.setSearchIndex(buffer.getLong());
+ insertNode.setTime(buffer.getLong());
+ try {
+ insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(buffer)));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
+ }
+ insertNode.deserializeMeasurementsAndValuesFromWAL(buffer);
+
+ return insertNode;
+ }
+
+ void deserializeMeasurementsAndValuesFromWAL(ByteBuffer buffer) {
+ int measurementSize = buffer.getInt();
+
+ measurements = new String[measurementSize];
+ measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchemas(buffer);
+
+ dataTypes = new TSDataType[measurementSize];
+ values = new Object[measurementSize];
+ fillDataTypesAndValuesFromWAL(buffer);
+
+ isAligned = buffer.get() == 1;
+ }
+
+ /** Make sure the dataTypes and values have been created before calling this */
+ public void fillDataTypesAndValuesFromWAL(ByteBuffer buffer) {
+ for (int i = 0; i < dataTypes.length; i++) {
+ byte typeNum = buffer.get();
+ if (typeNum == TYPE_NULL) {
+ continue;
+ }
+ dataTypes[i] = TSDataType.values()[typeNum];
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ values[i] = ReadWriteIOUtils.readBool(buffer);
+ break;
+ case INT32:
+ values[i] = ReadWriteIOUtils.readInt(buffer);
+ break;
+ case INT64:
+ values[i] = ReadWriteIOUtils.readLong(buffer);
+ break;
+ case FLOAT:
+ values[i] = ReadWriteIOUtils.readFloat(buffer);
+ break;
+ case DOUBLE:
+ values[i] = ReadWriteIOUtils.readDouble(buffer);
+ break;
+ case TEXT:
+ values[i] = ReadWriteIOUtils.readBinary(buffer);
+ break;
+ default:
+ throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
// endregion
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index c4c447c039..a9e64c9db9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -780,6 +780,10 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return size;
}
+ /**
+ * Compared with {@link this#serialize(ByteBuffer)}, more info: search index and data types, less
+ * info: isNeedInferType
+ */
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
serializeToWAL(buffer, 0, rowCount);
@@ -894,17 +898,20 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
/** Deserialize from wal */
- public static InsertTabletNode deserialize(DataInputStream stream)
- throws IllegalPathException, IOException {
+ public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws IOException {
// we do not store plan node id in wal entry
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
- insertNode.subDeserialize(stream);
+ insertNode.subDeserializeFromWAL(stream);
return insertNode;
}
- private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+ private void subDeserializeFromWAL(DataInputStream stream) throws IOException {
searchIndex = stream.readLong();
- devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+ try {
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+ }
int measurementSize = stream.readInt();
measurements = new String[measurementSize];
@@ -929,6 +936,45 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rowCount);
isAligned = stream.readByte() == 1;
}
+
+ public static InsertTabletNode deserializeFromWAL(ByteBuffer buffer) {
+ // we do not store plan node id in wal entry
+ InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
+ insertNode.subDeserializeFromWAL(buffer);
+ return insertNode;
+ }
+
+ private void subDeserializeFromWAL(ByteBuffer buffer) {
+ searchIndex = buffer.getLong();
+ try {
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+ }
+
+ int measurementSize = buffer.getInt();
+ measurements = new String[measurementSize];
+ measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchemas(buffer);
+
+ // data types are serialized in measurement schemas
+ dataTypes = new TSDataType[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ dataTypes[i] = measurementSchemas[i].getType();
+ }
+
+ rowCount = buffer.getInt();
+ times = new long[rowCount];
+ times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+
+ boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+ if (hasBitMaps) {
+ bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount);
+ }
+ columns =
+ QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount);
+ isAligned = buffer.get() == 1;
+ }
// endregion
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index 9c4da773e2..116ee01b4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.tools;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.exception.SystemCheckException;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.slf4j.Logger;
@@ -30,13 +30,11 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -89,26 +87,14 @@ public class WalChecker {
}
private boolean checkFile(File walFile) {
- int totalSize = 0;
try (DataInputStream logStream =
new DataInputStream(new BufferedInputStream(new FileInputStream(walFile)))) {
while (logStream.available() > 0) {
WALEntry walEntry = WALEntry.deserialize(logStream);
- totalSize += walEntry.serializedSize();
- if (walEntry.getValue() instanceof InsertTabletNode) {
- InsertTabletNode insertNode = (InsertTabletNode) walEntry.getValue();
- System.err.printf(
- "searchIndex: %s, timestamp: %s%n",
- insertNode.getSearchIndex(), Arrays.toString(insertNode.getTimes()));
+ if (walEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
+ return true;
}
}
- } catch (EOFException e) {
- if (totalSize == walFile.length()) {
- return true;
- } else {
- logger.error("{} fails the check because", walFile, e);
- return false;
- }
} catch (FileNotFoundException e) {
logger.debug("Wal file doesn't exist, skipping");
return true;
@@ -130,7 +116,12 @@ public class WalChecker {
/** @param args walRootDirectory */
public static void main(String[] args) throws SystemCheckException {
- WalChecker checker = new WalChecker("/Users/heimingz/Desktop/0");
+ if (args.length < 1) {
+ logger.error("No enough args: require the walRootDirectory");
+ return;
+ }
+
+ WalChecker checker = new WalChecker(args[0]);
List<File> files = checker.doCheck();
report(files);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
index 977ac51dfc..f78355be54 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALByteBufferView.java
@@ -49,4 +49,7 @@ public interface IWALByteBufferView {
/** Like {@link ByteBuffer#putDouble(double)}. */
void putDouble(double value);
+
+ /** Like {@link ByteBuffer#position()}. */
+ int position();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index e14ba4943d..4d4d710eb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.io.WALMetaData;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
@@ -35,7 +36,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -46,6 +46,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.iotdb.db.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
+
/**
* This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
* writes and avoid waiting for buffer syncing to disk.
@@ -53,7 +55,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class WALBuffer extends AbstractWALBuffer {
private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+ private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
/** whether close method is called */
@@ -102,8 +104,8 @@ public class WALBuffer extends AbstractWALBuffer {
private void allocateBuffers() {
try {
- workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
- idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+ workingBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
+ idleBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
} catch (OutOfMemoryError e) {
logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
close();
@@ -129,13 +131,18 @@ public class WALBuffer extends AbstractWALBuffer {
}
// region Task of serializeThread
+ /** This info class traverses some extra info from serializeThread to syncBufferThread */
+ private static class SerializeInfo {
+ final WALMetaData metaData = new WALMetaData();
+ final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+ WALFlushListener rollWALFileWriterListener = null;
+ }
+
/** This task serializes WALEntry to workingBuffer and will call fsync at last. */
private class SerializeTask implements Runnable {
- private final IWALByteBufferView byteBufferVew = new ByteBufferView();
- private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
-
+ private final ByteBufferView byteBufferVew = new ByteBufferView();
+ private final SerializeInfo info = new SerializeInfo();
private int batchSize = 0;
- private WALFlushListener rollWALFileWriterListener = null;
@Override
public void run() {
@@ -183,8 +190,7 @@ public class WALBuffer extends AbstractWALBuffer {
// call fsync at last and set fsyncListeners
if (batchSize > 0) {
- fsyncWorkingBuffer(
- currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+ fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
}
@@ -194,13 +200,13 @@ public class WALBuffer extends AbstractWALBuffer {
*/
private boolean handleWALEntry(WALEntry walEntry) {
if (walEntry.isSignal()) {
- return handleSignalEntry((SignalWALEntry) walEntry);
+ return handleSignalEntry((WALSignalEntry) walEntry);
}
boolean success = handleInfoEntry(walEntry);
if (success) {
++batchSize;
- fsyncListeners.add(walEntry.getWalFlushListener());
+ info.fsyncListeners.add(walEntry.getWalFlushListener());
}
return false;
}
@@ -211,8 +217,11 @@ public class WALBuffer extends AbstractWALBuffer {
* @return true if serialization is successful.
*/
private boolean handleInfoEntry(WALEntry walEntry) {
+ int size = byteBufferVew.position();
try {
walEntry.serialize(byteBufferVew);
+ size = byteBufferVew.position() - size;
+ logger.debug("wal entry size is: {}", size);
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
@@ -220,14 +229,17 @@ public class WALBuffer extends AbstractWALBuffer {
return false;
}
// update search index
+ long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
|| walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
InsertNode insertNode = (InsertNode) walEntry.getValue();
if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+ searchIndex = insertNode.getSearchIndex();
currentSearchIndex = insertNode.getSearchIndex();
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
+ info.metaData.add(size, searchIndex);
return true;
}
@@ -235,13 +247,12 @@ public class WALBuffer extends AbstractWALBuffer {
* @return true if fsyncWorkingBuffer has been called, which means this serialization task
* should be ended.
*/
- private boolean handleSignalEntry(SignalWALEntry signalWALEntry) {
- switch (signalWALEntry.getSignalType()) {
+ private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
+ switch (walSignalEntry.getType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
- rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
- fsyncWorkingBuffer(
- currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+ info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
+ fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
return true;
case CLOSE_SIGNAL:
logger.debug(
@@ -250,8 +261,7 @@ public class WALBuffer extends AbstractWALBuffer {
walEntries.size());
boolean dataExists = batchSize > 0;
if (dataExists) {
- fsyncWorkingBuffer(
- currentSearchIndex, currentFileStatus, fsyncListeners, rollWALFileWriterListener);
+ fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
return dataExists;
default:
@@ -265,6 +275,8 @@ public class WALBuffer extends AbstractWALBuffer {
* serializeThread and this class is only used by serializeThread.
*/
private class ByteBufferView implements IWALByteBufferView {
+ private int flushedBytesNum = 0;
+
private void ensureEnoughSpace(int bytesNum) {
if (workingBuffer.remaining() < bytesNum) {
rollBuffer();
@@ -272,6 +284,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
private void rollBuffer() {
+ flushedBytesNum += workingBuffer.position();
syncWorkingBuffer(currentSearchIndex, currentFileStatus);
}
@@ -333,6 +346,11 @@ public class WALBuffer extends AbstractWALBuffer {
ensureEnoughSpace(Double.BYTES);
workingBuffer.putDouble(value);
}
+
+ @Override
+ public int position() {
+ return flushedBytesNum + workingBuffer.position();
+ }
}
/** Notice: this method only called when buffer is exhausted by SerializeTask. */
@@ -343,15 +361,9 @@ public class WALBuffer extends AbstractWALBuffer {
}
/** Notice: this method only called at the last of SerializeTask. */
- private void fsyncWorkingBuffer(
- long searchIndex,
- WALFileStatus fileStatus,
- List<WALFlushListener> fsyncListeners,
- WALFlushListener rollWALFileWriterListener) {
+ private void fsyncWorkingBuffer(long searchIndex, WALFileStatus fileStatus, SerializeInfo info) {
switchWorkingBufferToFlushing();
- syncBufferThread.submit(
- new SyncBufferTask(
- searchIndex, fileStatus, true, fsyncListeners, rollWALFileWriterListener));
+ syncBufferThread.submit(new SyncBufferTask(searchIndex, fileStatus, true, info));
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
}
@@ -383,24 +395,18 @@ public class WALBuffer extends AbstractWALBuffer {
private final long searchIndex;
private final WALFileStatus fileStatus;
private final boolean forceFlag;
- private final List<WALFlushListener> fsyncListeners;
- private final WALFlushListener rollWALFileWriterListener;
+ private final SerializeInfo info;
public SyncBufferTask(long searchIndex, WALFileStatus fileStatus, boolean forceFlag) {
- this(searchIndex, fileStatus, forceFlag, null, null);
+ this(searchIndex, fileStatus, forceFlag, null);
}
public SyncBufferTask(
- long searchIndex,
- WALFileStatus fileStatus,
- boolean forceFlag,
- List<WALFlushListener> fsyncListeners,
- WALFlushListener rollWALFileWriterListener) {
+ long searchIndex, WALFileStatus fileStatus, boolean forceFlag, SerializeInfo info) {
this.searchIndex = searchIndex;
this.fileStatus = fileStatus;
this.forceFlag = forceFlag;
- this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
- this.rollWALFileWriterListener = rollWALFileWriterListener;
+ this.info = info == null ? new SerializeInfo() : info;
}
@Override
@@ -409,7 +415,7 @@ public class WALBuffer extends AbstractWALBuffer {
// flush buffer to os
try {
- currentWALFileWriter.write(syncingBuffer);
+ currentWALFileWriter.write(syncingBuffer, info.metaData);
} catch (Throwable e) {
logger.error(
"Fail to sync wal node-{}'s buffer, change system mode to read-only.", identifier, e);
@@ -427,32 +433,32 @@ public class WALBuffer extends AbstractWALBuffer {
"Fail to fsync wal node-{}'s log writer, change system mode to read-only.",
identifier,
e);
- for (WALFlushListener fsyncListener : fsyncListeners) {
+ for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.fail(e);
}
config.setReadOnly(true);
}
// notify all waiting listeners
- for (WALFlushListener fsyncListener : fsyncListeners) {
+ for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
}
}
// try to roll log writer
- if (rollWALFileWriterListener != null
+ if (info.rollWALFileWriterListener != null
|| (forceFlag && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
try {
rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
- if (rollWALFileWriterListener != null) {
- rollWALFileWriterListener.succeed();
+ if (info.rollWALFileWriterListener != null) {
+ info.rollWALFileWriterListener.succeed();
}
} catch (IOException e) {
logger.error(
"Fail to roll wal node-{}'s log writer, change system mode to read-only.",
identifier,
e);
- if (rollWALFileWriterListener != null) {
- rollWALFileWriterListener.fail(e);
+ if (info.rollWALFileWriterListener != null) {
+ info.rollWALFileWriterListener.fail(e);
}
config.setReadOnly(true);
}
@@ -502,7 +508,7 @@ public class WALBuffer extends AbstractWALBuffer {
if (serializeThread != null) {
// add close signal WALEntry to notify serializeThread
try {
- walEntries.put(new SignalWALEntry(SignalWALEntry.SignalType.CLOSE_SIGNAL));
+ walEntries.put(new WALSignalEntry(WALEntryType.CLOSE_SIGNAL));
} catch (InterruptedException e) {
logger.error("Fail to put CLOSE_SIGNAL to walEntries.", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 0c9140c778..27ae7f867d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -19,10 +19,9 @@
package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -31,56 +30,34 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.SerializedSize;
-import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
/**
* WALEntry is the basic element of .wal file, including type, memTable id, and specific
* value(physical plan or memTable snapshot).
*/
-public class WALEntry implements SerializedSize {
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- /** wal entry type 1 byte, memTable id 8 bytes */
- private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Long.BYTES;
+public abstract class WALEntry implements SerializedSize {
+ private static final Logger logger = LoggerFactory.getLogger(WALEntry.class);
/** type of value */
- private final WALEntryType type;
+ protected final WALEntryType type;
/** memTable id */
- private final long memTableId;
+ protected final long memTableId;
/** value(physical plan or memTable snapshot) */
- private final WALEntryValue value;
- /** extra info for InsertTabletPlan type value */
- private TabletInfo tabletInfo;
-
+ protected final WALEntryValue value;
/**
* listen whether this WALEntry has been written to the filesystem, null iff this WALEntry is
* deserialized from .wal file
*/
- private final WALFlushListener walFlushListener;
-
- public WALEntry(long memTableId, WALEntryValue value) {
- this(memTableId, value, config.getWalMode() == WALMode.SYNC);
- if (value instanceof InsertTabletPlan) {
- tabletInfo = new TabletInfo(0, ((InsertTabletPlan) value).getRowCount());
- } else if (value instanceof InsertTabletNode) {
- tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount());
- }
- }
-
- public WALEntry(long memTableId, InsertTabletPlan value, int tabletStart, int tabletEnd) {
- this(memTableId, value, config.getWalMode() == WALMode.SYNC);
- tabletInfo = new TabletInfo(tabletStart, tabletEnd);
- }
-
- public WALEntry(long memTableId, InsertTabletNode value, int tabletStart, int tabletEnd) {
- this(memTableId, value, config.getWalMode() == WALMode.SYNC);
- tabletInfo = new TabletInfo(tabletStart, tabletEnd);
- }
+ protected final WALFlushListener walFlushListener;
public WALEntry(long memTableId, WALEntryValue value, boolean wait) {
this.memTableId = memTableId;
@@ -103,38 +80,14 @@ public class WALEntry implements SerializedSize {
walFlushListener = new WALFlushListener(wait);
}
- private WALEntry(WALEntryType type, long memTableId, WALEntryValue value) {
+ protected WALEntry(WALEntryType type, long memTableId, WALEntryValue value, boolean wait) {
this.type = type;
this.memTableId = memTableId;
this.value = value;
- this.walFlushListener = null;
+ this.walFlushListener = new WALFlushListener(wait);
}
- @Override
- public int serializedSize() {
- return FIXED_SERIALIZED_SIZE + value.serializedSize();
- }
-
- public void serialize(IWALByteBufferView buffer) {
- buffer.put(type.getCode());
- buffer.putLong(memTableId);
- switch (type) {
- case INSERT_TABLET_PLAN:
- ((InsertTabletPlan) value)
- .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
- break;
- case INSERT_TABLET_NODE:
- ((InsertTabletNode) value)
- .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
- break;
- case INSERT_ROW_PLAN:
- case INSERT_ROW_NODE:
- case DELETE_PLAN:
- case MEMORY_TABLE_SNAPSHOT:
- value.serializeToWAL(buffer);
- break;
- }
- }
+ public abstract void serialize(IWALByteBufferView buffer);
public static WALEntry deserialize(DataInputStream stream)
throws IllegalPathException, IOException {
@@ -144,6 +97,15 @@ public class WALEntry implements SerializedSize {
throw new IOException("unrecognized wal entry type " + typeNum);
}
+ // handle signal
+ switch (type) {
+ case CLOSE_SIGNAL:
+ case ROLL_WAL_LOG_WRITER_SIGNAL:
+ case WAL_FILE_INFO_END_MARKER:
+ return new WALSignalEntry(type);
+ }
+
+ // handle info
long memTableId = stream.readLong();
WALEntryValue value = null;
switch (type) {
@@ -160,13 +122,30 @@ public class WALEntry implements SerializedSize {
value = AbstractMemTable.Factory.create(stream);
break;
case INSERT_ROW_NODE:
- value = (InsertRowNode) PlanNodeType.deserialize(stream);
+ value = (InsertRowNode) PlanNodeType.deserializeFromWAL(stream);
break;
case INSERT_TABLET_NODE:
- value = (InsertTabletNode) PlanNodeType.deserialize(stream);
+ value = (InsertTabletNode) PlanNodeType.deserializeFromWAL(stream);
break;
}
- return new WALEntry(type, memTableId, value);
+ return new WALInfoEntry(type, memTableId, value);
+ }
+
+ /**
+ * This deserialization method is only for multi-leader consensus and just deserializes
+ * InsertRowNode and InsertTabletNode
+ */
+ public static PlanNode deserializeInsertNode(ByteBuffer buffer) {
+ logger.debug(
+ "buffer capacity is: {}, limit is: {}, position is: {}",
+ buffer.capacity(),
+ buffer.limit(),
+ buffer.position());
+ // wal entry type
+ buffer.get();
+ // memtable id
+ buffer.getLong();
+ return PlanNodeType.deserializeFromWAL(buffer);
}
@Override
@@ -202,19 +181,5 @@ public class WALEntry implements SerializedSize {
return walFlushListener;
}
- public boolean isSignal() {
- return false;
- }
-
- private static class TabletInfo {
- /** start row of insert tablet */
- private final int tabletStart;
- /** end row of insert tablet */
- private final int tabletEnd;
-
- public TabletInfo(int tabletStart, int tabletEnd) {
- this.tabletStart = tabletStart;
- this.tabletEnd = tabletEnd;
- }
- }
+ public abstract boolean isSignal();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
index 9a6c8eacd4..d6483db413 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java
@@ -18,8 +18,9 @@
*/
package org.apache.iotdb.db.wal.buffer;
-/** Type of {@link WALEntry} */
+/** Type of {@link WALEntry}, including info type and signal type */
public enum WALEntryType {
+ // region info entry type
/** {@link org.apache.iotdb.db.qp.physical.crud.InsertRowPlan} */
INSERT_ROW_PLAN((byte) 0),
/** {@link org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan} */
@@ -32,7 +33,16 @@ public enum WALEntryType {
INSERT_ROW_NODE((byte) 4),
/** {@link org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode} */
INSERT_TABLET_NODE((byte) 5),
- ;
+ // endregion
+ // region signal entry type
+ /** signal wal buffer has been closed */
+ CLOSE_SIGNAL(Byte.MIN_VALUE),
+ /** signal wal buffer to roll wal log writer */
+ ROLL_WAL_LOG_WRITER_SIGNAL((byte) (Byte.MIN_VALUE + 1)),
+ /** mark the wal file info part ends */
+ WAL_FILE_INFO_END_MARKER((byte) (Byte.MIN_VALUE + 2)),
+// endregion
+;
private final byte code;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
index 95b3ddc20d..df9a24c308 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryValue.java
@@ -23,8 +23,9 @@ import org.apache.iotdb.db.utils.SerializedSize;
/** A class implements this interface can be written into .wal file. */
public interface WALEntryValue extends SerializedSize {
/**
- * Serialize using {@link IWALByteBufferView}, which encapsulates some actions to deal with {@link
- * java.nio.BufferOverflowException} occurs in {@link java.nio.ByteBuffer}.
+ * Serialize using {@link org.apache.iotdb.db.wal.buffer.IWALByteBufferView}, which encapsulates
+ * some actions to deal with {@link java.nio.BufferOverflowException} occurs in {@link
+ * java.nio.ByteBuffer}.
*/
void serializeToWAL(IWALByteBufferView buffer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
new file mode 100644
index 0000000000..58635400b7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+/** This entry class stores info for persistence */
+public class WALInfoEntry extends WALEntry {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ /** wal entry type 1 byte, memTable id 8 bytes */
+ public static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Long.BYTES;
+
+ /** extra info for InsertTabletPlan type value */
+ private TabletInfo tabletInfo;
+
+ public WALInfoEntry(long memTableId, WALEntryValue value, boolean wait) {
+ super(memTableId, value, wait);
+ }
+
+ public WALInfoEntry(long memTableId, WALEntryValue value) {
+ this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+ if (value instanceof InsertTabletPlan) {
+ tabletInfo = new TabletInfo(0, ((InsertTabletPlan) value).getRowCount());
+ } else if (value instanceof InsertTabletNode) {
+ tabletInfo = new TabletInfo(0, ((InsertTabletNode) value).getRowCount());
+ }
+ }
+
+ public WALInfoEntry(long memTableId, InsertTabletPlan value, int tabletStart, int tabletEnd) {
+ this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+ tabletInfo = new TabletInfo(tabletStart, tabletEnd);
+ }
+
+ public WALInfoEntry(long memTableId, InsertTabletNode value, int tabletStart, int tabletEnd) {
+ this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+ tabletInfo = new TabletInfo(tabletStart, tabletEnd);
+ }
+
+ WALInfoEntry(WALEntryType type, long memTableId, WALEntryValue value) {
+ super(type, memTableId, value, false);
+ }
+
+ @Override
+ public int serializedSize() {
+ return FIXED_SERIALIZED_SIZE + (value == null ? 0 : value.serializedSize());
+ }
+
+ @Override
+ public void serialize(IWALByteBufferView buffer) {
+ buffer.put(type.getCode());
+ buffer.putLong(memTableId);
+ switch (type) {
+ case INSERT_TABLET_PLAN:
+ ((InsertTabletPlan) value)
+ .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
+ break;
+ case INSERT_TABLET_NODE:
+ ((InsertTabletNode) value)
+ .serializeToWAL(buffer, tabletInfo.tabletStart, tabletInfo.tabletEnd);
+ break;
+ case INSERT_ROW_PLAN:
+ case INSERT_ROW_NODE:
+ case DELETE_PLAN:
+ case MEMORY_TABLE_SNAPSHOT:
+ value.serializeToWAL(buffer);
+ break;
+ }
+ }
+
+ private static class TabletInfo {
+ /** start row of insert tablet */
+ private final int tabletStart;
+ /** end row of insert tablet */
+ private final int tabletEnd;
+
+ public TabletInfo(int tabletStart, int tabletEnd) {
+ this.tabletStart = tabletStart;
+ this.tabletEnd = tabletEnd;
+ }
+ }
+
+ @Override
+ public boolean isSignal() {
+ return false;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java
rename to server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
index d2e671d587..854c2692a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/SignalWALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALSignalEntry.java
@@ -18,34 +18,45 @@
*/
package org.apache.iotdb.db.wal.buffer;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import java.nio.ByteBuffer;
-/** This class provides a signal to help wal buffer dealing with some special cases */
-public class SignalWALEntry extends WALEntry {
- private final SignalType signalType;
-
- public SignalWALEntry(SignalType signalType) {
+/** This entry class provides a signal to help wal buffer dealing with some special cases */
+public class WALSignalEntry extends WALEntry {
+ public WALSignalEntry(WALEntryType signalType) {
this(signalType, false);
}
- public SignalWALEntry(SignalType signalType, boolean wait) {
- super(Long.MIN_VALUE, new DeletePlan(), wait);
- this.signalType = signalType;
+ public WALSignalEntry(WALEntryType signalType, boolean wait) {
+ super(signalType, Long.MIN_VALUE, null, wait);
+ switch (signalType) {
+ case INSERT_TABLET_PLAN:
+ case INSERT_TABLET_NODE:
+ case INSERT_ROW_PLAN:
+ case INSERT_ROW_NODE:
+ case DELETE_PLAN:
+ case MEMORY_TABLE_SNAPSHOT:
+ throw new RuntimeException("Cannot use wal info type as wal signal type");
+ default:
+ break;
+ }
}
@Override
- public boolean isSignal() {
- return true;
+ public int serializedSize() {
+ return Byte.BYTES;
}
- public SignalType getSignalType() {
- return signalType;
+ @Override
+ public void serialize(IWALByteBufferView buffer) {
+ buffer.put(type.getCode());
}
- public enum SignalType {
- /** signal wal buffer has been closed */
- CLOSE_SIGNAL,
- /** signal wal buffer to roll wal log writer */
- ROLL_WAL_LOG_WRITER_SIGNAL,
+ public void serialize(ByteBuffer buffer) {
+ buffer.put(type.getCode());
+ }
+
+ @Override
+ public boolean isSignal() {
+ return true;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
index 635022d28d..34815c1564 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/LogWriter.java
@@ -39,11 +39,10 @@ import java.nio.channels.FileChannel;
public abstract class LogWriter implements ILogWriter {
private static final Logger logger = LoggerFactory.getLogger(LogWriter.class);
- private final File logFile;
- private final FileOutputStream logStream;
- private final FileChannel logChannel;
-
- private long size;
+ protected final File logFile;
+ protected final FileOutputStream logStream;
+ protected final FileChannel logChannel;
+ protected long size;
public LogWriter(File logFile) throws FileNotFoundException {
this.logFile = logFile;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
new file mode 100644
index 0000000000..a993fe7c57
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.io;
+
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Iterator;
+
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING;
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING_BYTES;
+
+/**
+ * This reader returns {@link WALEntry} as {@link ByteBuffer}, the usage of WALByteBufReader is like
+ * {@link Iterator}.
+ */
+public class WALByteBufReader implements Closeable {
+ private final File logFile;
+ private final FileChannel channel;
+ private final WALMetaData metaData;
+ private final Iterator<Integer> sizeIterator;
+
+ public WALByteBufReader(File logFile) throws IOException {
+ this.logFile = logFile;
+ this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
+ if (!readTailMagic().equals(MAGIC_STRING)) {
+ throw new IOException(String.format("Broken wal file %s", logFile));
+ }
+ // load metadata size
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position = channel.size() - MAGIC_STRING_BYTES - Integer.BYTES;
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ // load metadata
+ int metadataSize = metadataSizeBuf.getInt();
+ ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
+ channel.read(metadataBuf, position - metadataSize);
+ metadataBuf.flip();
+ metaData = WALMetaData.deserialize(metadataBuf);
+ // init iterator
+ sizeIterator = metaData.getBuffersSize().iterator();
+ channel.position(0);
+ }
+
+ /** Like {@link Iterator#hasNext()} */
+ public boolean hasNext() {
+ return sizeIterator.hasNext();
+ }
+
+ /** Like {@link Iterator#next()} */
+ public ByteBuffer next() throws IOException {
+ int size = sizeIterator.next();
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ channel.read(buffer);
+ buffer.clear();
+ return buffer;
+ }
+
+ private String readTailMagic() throws IOException {
+ ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
+ channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ public long getFirstSearchIndex() {
+ return metaData.getFirstSearchIndex();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
new file mode 100644
index 0000000000..35bbd5e67d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALMetaData.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.io;
+
+import org.apache.iotdb.db.utils.SerializedSize;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
+
+/**
+ * Metadata exists at the end of each wal file, including each entry's size, search index of first
+ * entry and the number of entries.
+ */
+public class WALMetaData implements SerializedSize {
+ /** search index 8 byte, wal entries' number 4 bytes */
+ private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
+
+ /** search index of first entry */
+ private long firstSearchIndex;
+
+ /** each entry's size */
+ private final List<Integer> buffersSize;
+
+ public WALMetaData() {
+ this(DEFAULT_SEARCH_INDEX, new ArrayList<>());
+ }
+
+ public WALMetaData(long firstSearchIndex, List<Integer> buffersSize) {
+ this.firstSearchIndex = firstSearchIndex;
+ this.buffersSize = buffersSize;
+ }
+
+ public void add(int size, long searchIndex) {
+ if (buffersSize.isEmpty()) {
+ firstSearchIndex = searchIndex;
+ }
+ buffersSize.add(size);
+ }
+
+ public void addAll(WALMetaData metaData) {
+ if (buffersSize.isEmpty()) {
+ firstSearchIndex = metaData.getFirstSearchIndex();
+ }
+ buffersSize.addAll(metaData.getBuffersSize());
+ }
+
+ @Override
+ public int serializedSize() {
+ return FIXED_SERIALIZED_SIZE + buffersSize.size() * Integer.BYTES;
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ buffer.putLong(firstSearchIndex);
+ buffer.putInt(buffersSize.size());
+ for (int size : buffersSize) {
+ buffer.putInt(size);
+ }
+ }
+
+ public static WALMetaData deserialize(ByteBuffer buffer) {
+ long firstSearchIndex = buffer.getLong();
+ int entriesNum = buffer.getInt();
+ List<Integer> buffersSize = new ArrayList<>(entriesNum);
+ for (int i = 0; i < entriesNum; ++i) {
+ buffersSize.add(buffer.getInt());
+ }
+ return new WALMetaData(firstSearchIndex, buffersSize);
+ }
+
+ public List<Integer> getBuffersSize() {
+ return buffersSize;
+ }
+
+ public long getFirstSearchIndex() {
+ return firstSearchIndex;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
index 41a0cf145d..24fefc4c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,46 +29,36 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.NoSuchElementException;
/**
- * The usage of WALReader is like {@link Iterator}, which aims to control the memory usage of
- * reader.
+ * This reader returns {@link WALEntry} directly, the usage of WALReader is like {@link Iterator}.
*/
public class WALReader implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(WALReader.class);
/** 1/10 of .wal file size as buffer size */
private static final int STREAM_BUFFER_SIZE =
(int) IoTDBDescriptor.getInstance().getConfig().getWalFileSizeThresholdInByte() / 10;
- /** 1000 as default batch limit */
- private static final int BATCH_LIMIT = 1_000;
private final File logFile;
private final DataInputStream logStream;
- private final List<WALEntry> walEntries;
-
- private Iterator<WALEntry> itr = null;
+ private WALEntry nextEntry;
private boolean fileCorrupted = false;
- public WALReader(File logFile) throws FileNotFoundException {
+ public WALReader(File logFile) throws IOException {
this.logFile = logFile;
this.logStream =
new DataInputStream(
- new BufferedInputStream(new FileInputStream(logFile), STREAM_BUFFER_SIZE));
- this.walEntries = new LinkedList<>();
+ new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE));
}
/** Like {@link Iterator#hasNext()} */
public boolean hasNext() {
- if (itr != null && itr.hasNext()) {
+ if (nextEntry != null) {
return true;
}
// read WALEntries from log stream
@@ -75,14 +66,11 @@ public class WALReader implements Closeable {
if (fileCorrupted) {
return false;
}
- walEntries.clear();
- while (walEntries.size() < BATCH_LIMIT) {
- WALEntry walEntry = WALEntry.deserialize(logStream);
- walEntries.add(walEntry);
+ nextEntry = WALEntry.deserialize(logStream);
+ if (nextEntry.getType() == WALEntryType.WAL_FILE_INFO_END_MARKER) {
+ nextEntry = null;
+ return false;
}
- } catch (EOFException e) {
- // reach end of wal file
- fileCorrupted = true;
} catch (IllegalPathException e) {
fileCorrupted = true;
logger.warn(
@@ -92,19 +80,17 @@ public class WALReader implements Closeable {
logger.warn("Fail to read WALEntry from wal file {}, skip broken WALEntries.", logFile, e);
}
- if (walEntries.size() != 0) {
- itr = walEntries.iterator();
- return true;
- }
- return false;
+ return nextEntry != null;
}
/** Like {@link Iterator#next()} */
public WALEntry next() {
- if (itr == null) {
+ if (nextEntry == null) {
throw new NoSuchElementException();
}
- return itr.next();
+ WALEntry next = nextEntry;
+ nextEntry = null;
+ return next;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 5ce2bcb251..b0f809eab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -19,19 +19,63 @@
package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
/** WALWriter writes the binary {@link WALEntry} into .wal file. */
public class WALWriter extends LogWriter {
+ public static final String MAGIC_STRING = "WAL";
+ public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes().length;
+
private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
+ /** wal files' metadata */
+ protected final WALMetaData metaData = new WALMetaData();
+
public WALWriter(File logFile) throws FileNotFoundException {
super(logFile);
}
+ /** Writes buffer and update its' metadata */
+ public void write(ByteBuffer buffer, WALMetaData metaData) throws IOException {
+ // update metadata
+ updateMetaData(metaData);
+ // flush buffer
+ write(buffer);
+ }
+
+ public void updateMetaData(WALMetaData metaData) {
+ this.metaData.addAll(metaData);
+ }
+
+ private void endFile() throws IOException {
+ WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
+ int metaDataSize = metaData.serializedSize();
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ endMarker.serializedSize() + metaDataSize + Integer.BYTES + MAGIC_STRING_BYTES);
+ // mark info part ends
+ endMarker.serialize(buffer);
+ // flush meta data
+ metaData.serialize(buffer);
+ buffer.putInt(metaDataSize);
+ // add magic string
+ buffer.put(MAGIC_STRING.getBytes());
+ write(buffer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ endFile();
+ super.close();
+ }
+
public void updateFileStatus(WALFileStatus walFileStatus) {
if (walFileStatus == WALFileStatus.CONTAINS_SEARCH_INDEX) {
this.walFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 22030be9e8..662465359e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.wal.node;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -28,8 +27,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.wal.exception.WALException;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
-import java.util.List;
-
/** This class provides fake wal node when wal is disabled or exception happens. */
public class WALFakeNode implements IWALNode {
private final WALFlushListener.Status status;
@@ -104,16 +101,6 @@ public class WALFakeNode implements IWALNode {
throw new UnsupportedOperationException();
}
- @Override
- public IConsensusRequest getReq(long index) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<IConsensusRequest> getReqs(long startIndex, int num) {
- throw new UnsupportedOperationException();
- }
-
@Override
public ReqIterator getReqIterator(long startIndex) {
throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 3e50041e73..f3eb5c2271 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -34,24 +35,21 @@ import org.apache.iotdb.db.engine.flush.FlushStatus;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.wal.buffer.IWALBuffer;
-import org.apache.iotdb.db.wal.buffer.SignalWALEntry;
import org.apache.iotdb.db.wal.buffer.WALBuffer;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
-import org.apache.iotdb.db.wal.io.WALReader;
+import org.apache.iotdb.db.wal.io.WALByteBufReader;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
@@ -63,6 +61,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -129,33 +128,33 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(long memTableId, InsertRowPlan insertRowPlan) {
- WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
+ WALEntry walEntry = new WALInfoEntry(memTableId, insertRowPlan);
return log(walEntry);
}
@Override
public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
- WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
+ WALEntry walEntry = new WALInfoEntry(memTableId, insertRowNode);
return log(walEntry);
}
@Override
public WALFlushListener log(
long memTableId, InsertTabletPlan insertTabletPlan, int start, int end) {
- WALEntry walEntry = new WALEntry(memTableId, insertTabletPlan, start, end);
+ WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletPlan, start, end);
return log(walEntry);
}
@Override
public WALFlushListener log(
long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
- WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
+ WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, start, end);
return log(walEntry);
}
@Override
public WALFlushListener log(long memTableId, DeletePlan deletePlan) {
- WALEntry walEntry = new WALEntry(memTableId, deletePlan);
+ WALEntry walEntry = new WALInfoEntry(memTableId, deletePlan);
return log(walEntry);
}
@@ -421,7 +420,7 @@ public class WALNode implements IWALNode {
memTableSnapshotCount.compute(memTable.getMemTableId(), (k, v) -> v == null ? 1 : v + 1);
// roll wal log writer to make sure first version id will be updated
WALEntry rollWALFileSignal =
- new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+ new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener fileRolledListener = log(rollWALFileSignal);
if (fileRolledListener.waitForResult() == WALFlushListener.Status.FAILURE) {
logger.error("Fail to roll wal log writer.", fileRolledListener.getCause());
@@ -437,7 +436,7 @@ public class WALNode implements IWALNode {
"CheckpointManager$DeleteOutdatedFileTask.snapshotOrFlushOldestMemTable");
try {
// log snapshot in a new .wal file
- WALEntry walEntry = new WALEntry(memTable.getMemTableId(), memTable, true);
+ WALEntry walEntry = new WALInfoEntry(memTable.getMemTableId(), memTable, true);
WALFlushListener flushListener = log(walEntry);
// wait until getting the result
@@ -463,193 +462,7 @@ public class WALNode implements IWALNode {
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
}
- /**
- * Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
- * merged to one multi-tablet). <br>
- * Notice: the continuity of insert nodes sharing same search index should be protected by the
- * upper layer.
- */
- private static InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
- int size = insertNodes.size();
- if (size == 0) {
- return null;
- }
- if (size == 1) {
- InsertNode insertNode = insertNodes.get(0);
- insertNode.setPlanNodeId(new PlanNodeId(""));
- return insertNode;
- }
-
- InsertNode result;
- if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to InsertMultiTabletsNode
- List<Integer> index = new ArrayList<>(size);
- List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
- int i = 0;
- for (InsertNode insertNode : insertNodes) {
- insertTabletNodes.add((InsertTabletNode) insertNode);
- index.add(i);
- i++;
- }
- result = new InsertMultiTabletsNode(new PlanNodeId(""), index, insertTabletNodes);
- } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
- boolean sameDevice = true;
- PartialPath device = insertNodes.get(0).getDevicePath();
- List<Integer> index = new ArrayList<>(size);
- List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
- int i = 0;
- for (InsertNode insertNode : insertNodes) {
- if (sameDevice && !insertNode.getDevicePath().equals(device)) {
- sameDevice = false;
- }
- insertRowNodes.add((InsertRowNode) insertNode);
- index.add(i);
- i++;
- }
- result =
- sameDevice
- ? new InsertRowsOfOneDeviceNode(new PlanNodeId(""), index, insertRowNodes)
- : new InsertRowsNode(new PlanNodeId(""), index, insertRowNodes);
- }
- result.setSearchIndex(insertNodes.get(0).getSearchIndex());
- result.setDevicePath(insertNodes.get(0).getDevicePath());
- return result;
- }
-
- @Override
- public IConsensusRequest getReq(long index) {
- // find file
- File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
- WALFileUtils.ascSortByVersionId(currentFiles);
- int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, index);
- if (fileIndex < 0) {
- return null;
- }
- // find log
- List<InsertNode> tmpNodes = new ArrayList<>();
- for (int i = fileIndex; i < currentFiles.length; i++) {
- // cannot find anymore
- if (index < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
- if (!tmpNodes.isEmpty()) {
- return mergeInsertNodes(tmpNodes);
- } else {
- break;
- }
- }
- // cannot find any in this file
- if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- if (!tmpNodes.isEmpty()) {
- return mergeInsertNodes(tmpNodes);
- } else {
- continue;
- }
- }
-
- try (WALReader walReader = new WALReader(currentFiles[i])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == index) {
- tmpNodes.add(insertNode);
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- return mergeInsertNodes(tmpNodes);
- }
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- return mergeInsertNodes(tmpNodes);
- }
- }
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to call getReq({}) again.", currentFiles[i], index);
- return getReq(index);
- } catch (Exception e) {
- logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
- }
- }
- // not find or not complete
- return null;
- }
-
- @Override
- public List<IConsensusRequest> getReqs(long startIndex, int num) {
- List<IConsensusRequest> result = new ArrayList<>(num);
- // find file
- File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
- WALFileUtils.ascSortByVersionId(currentFiles);
- int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, startIndex);
- if (fileIndex < 0) {
- return result;
- }
- // find logs
- long endIndex = startIndex + num - 1;
- long targetIndex = startIndex;
- List<InsertNode> tmpNodes = new ArrayList<>();
- for (int i = fileIndex; i < currentFiles.length; i++) {
- // cannot find anymore
- if (endIndex < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
- if (!tmpNodes.isEmpty()) {
- result.add(mergeInsertNodes(tmpNodes));
- } else {
- break;
- }
- }
- // cannot find any in this file
- if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- if (!tmpNodes.isEmpty()) {
- result.add(mergeInsertNodes(tmpNodes));
- } else {
- continue;
- }
- }
-
- try (WALReader walReader = new WALReader(currentFiles[i])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- result.add(mergeInsertNodes(tmpNodes));
- if (result.size() == num) {
- return result;
- }
- targetIndex++;
- tmpNodes = new ArrayList<>();
- // remember to add current insert node
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
- }
- }
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- result.add(mergeInsertNodes(tmpNodes));
- if (result.size() == num) {
- return result;
- }
- targetIndex++;
- tmpNodes = new ArrayList<>();
- }
- }
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to call getReqs({}, {}) again.",
- currentFiles[i],
- startIndex,
- num);
- return getReqs(startIndex, num);
- } catch (Exception e) {
- logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
- }
- }
-
- return result;
- }
-
- /** This iterator is not concurrency-safe */
+ /** This iterator is not concurrency-safe, cannot read the current-writing wal file. */
@Override
public ReqIterator getReqIterator(long startIndex) {
return new PlanNodeIterator(startIndex);
@@ -665,9 +478,9 @@ public class WALNode implements IWALNode {
/** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
private boolean needUpdatingFilesToSearch = true;
/** batch store insert nodes */
- private final List<InsertNode> insertNodes = new LinkedList<>();
+ private final List<IndexedConsensusRequest> insertNodes = new LinkedList<>();
/** iterator of insertNodes */
- private Iterator<InsertNode> itr = null;
+ private Iterator<IndexedConsensusRequest> itr = null;
public PlanNodeIterator(long startIndex) {
this.nextSearchIndex = startIndex;
@@ -697,7 +510,7 @@ public class WALNode implements IWALNode {
while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
== WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
currentFileIndex++;
- if (currentFileIndex >= filesToSearch.length) {
+ if (currentFileIndex >= filesToSearch.length - 1) {
needUpdatingFilesToSearch = true;
return false;
}
@@ -707,34 +520,38 @@ public class WALNode implements IWALNode {
while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
== WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
currentFileIndex++;
- if (currentFileIndex >= filesToSearch.length) {
+ if (currentFileIndex >= filesToSearch.length - 1) {
needUpdatingFilesToSearch = true;
return false;
}
}
- // find all insert plan of current wal file
- List<InsertNode> tmpNodes = new ArrayList<>();
+ // find all insert node of current wal file
+ List<IConsensusRequest> tmpNodes = new ArrayList<>();
long targetIndex = nextSearchIndex;
- try (WALReader walReader = new WALReader(filesToSearch[currentFileIndex])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
+ try (WALByteBufReader walByteBufReader =
+ new WALByteBufReader(filesToSearch[currentFileIndex])) {
+ while (walByteBufReader.hasNext()) {
+ ByteBuffer buffer = walByteBufReader.next();
+ WALEntryType type = WALEntryType.valueOf(buffer.get());
+ if (type == WALEntryType.INSERT_TABLET_NODE || type == WALEntryType.INSERT_ROW_NODE) {
+ // see WALInfoEntry#serialize, entry type + memtable id + insert node type
+ buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
+ long searchIndex = buffer.getLong();
+ buffer.clear();
+ if (searchIndex == targetIndex) {
+ tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();
// remember to add current insert node
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
+ if (searchIndex == targetIndex) {
+ tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
}
}
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();
}
@@ -755,32 +572,42 @@ public class WALNode implements IWALNode {
currentFileIndex++;
} else {
int fileIndex = currentFileIndex + 1;
- while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+ while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
// cannot find any in this file, find all slices of last insert plan
if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
== WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- insertNodes.add(mergeInsertNodes(tmpNodes));
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
}
-
- try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
+ // read until find all insert node whose search index equals target index
+ try (WALByteBufReader walByteBufReader = new WALByteBufReader(filesToSearch[fileIndex])) {
+ if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ } else {
+ while (walByteBufReader.hasNext()) {
+ ByteBuffer buffer = walByteBufReader.next();
+ WALEntryType type = WALEntryType.valueOf(buffer.get());
+ if (type == WALEntryType.INSERT_TABLET_NODE
+ || type == WALEntryType.INSERT_ROW_NODE) {
+ // see WALInfoEntry#serialize, entry type + memtable id + insert node type
+ buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES);
+ long searchIndex = buffer.getLong();
+ buffer.clear();
+ if (searchIndex == targetIndex) {
+ tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
+ insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes));
tmpNodes = Collections.emptyList();
break;
}
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
}
}
} catch (FileNotFoundException e) {
@@ -806,7 +633,7 @@ public class WALNode implements IWALNode {
}
// update file index and version id
- if (currentFileIndex >= filesToSearch.length) {
+ if (currentFileIndex >= filesToSearch.length - 1) {
needUpdatingFilesToSearch = true;
}
@@ -824,27 +651,27 @@ public class WALNode implements IWALNode {
throw new NoSuchElementException();
}
- InsertNode insertNode = itr.next();
- if (insertNode.getSearchIndex() == nextSearchIndex) {
+ IndexedConsensusRequest request = itr.next();
+ if (request.getSearchIndex() == nextSearchIndex) {
nextSearchIndex++;
- } else if (insertNode.getSearchIndex() > nextSearchIndex) {
+ } else if (request.getSearchIndex() > nextSearchIndex) {
logger.warn(
"Search index of wal node-{} are not continuously, skip from {} to {}.",
identifier,
nextSearchIndex,
- insertNode.getSearchIndex());
- skipTo(insertNode.getSearchIndex() + 1);
+ request.getSearchIndex());
+ skipTo(request.getSearchIndex() + 1);
} else {
logger.error(
"Search index of wal node-{} are out of order, {} is before {}.",
identifier,
nextSearchIndex,
- insertNode.getSearchIndex());
+ request.getSearchIndex());
throw new RuntimeException(
String.format("Search index of wal node-%s are out of order", identifier));
}
- return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, insertNode);
+ return request;
}
@Override
@@ -893,7 +720,8 @@ public class WALNode implements IWALNode {
int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
logger.debug(
"searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch);
- if (filesToSearch != null && fileIndex >= 0) { // possible to find next
+ if (filesToSearch != null
+ && (fileIndex >= 0 && fileIndex < filesToSearch.length - 1)) { // possible to find next
this.filesToSearch = filesToSearch;
this.currentFileIndex = fileIndex;
this.needUpdatingFilesToSearch = false;
@@ -934,8 +762,7 @@ public class WALNode implements IWALNode {
@TestOnly
public void rollWALFile() {
- WALEntry rollWALFileSignal =
- new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+ WALEntry rollWALFileSignal = new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener walFlushListener = log(rollWALFileSignal);
if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index ed8042e08d..2bd4e6e114 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.buffer.WALEntryType;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALMetaData;
import org.apache.iotdb.db.wal.io.WALReader;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
@@ -39,11 +40,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+
/** This task is responsible for the recovery of one wal node. */
public class WALNodeRecoverTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(WALNodeRecoverTask.class);
@@ -103,7 +107,7 @@ public class WALNodeRecoverTask implements Runnable {
checkpointFile.delete();
}
// recover version id and search index
- long[] indexInfo = recoverLastSearchIndex();
+ long[] indexInfo = recoverLastFile();
long lastVersionId = indexInfo[0];
long lastSearchIndex = indexInfo[1];
WALManager.getInstance()
@@ -118,7 +122,7 @@ public class WALNodeRecoverTask implements Runnable {
}
}
- private long[] recoverLastSearchIndex() {
+ private long[] recoverLastFile() {
File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
if (walFiles == null || walFiles.length == 0) {
return new long[] {0L, 0L};
@@ -128,22 +132,33 @@ public class WALNodeRecoverTask implements Runnable {
File lastWALFile = walFiles[walFiles.length - 1];
long lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
long lastSearchIndex = WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
+ WALMetaData metaData = new WALMetaData();
WALFileStatus fileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
try (WALReader walReader = new WALReader(lastWALFile)) {
while (walReader.hasNext()) {
WALEntry walEntry = walReader.next();
+ long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
|| walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
InsertNode insertNode = (InsertNode) walEntry.getValue();
if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+ searchIndex = insertNode.getSearchIndex();
lastSearchIndex = Math.max(lastSearchIndex, insertNode.getSearchIndex());
fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
+ metaData.add(walEntry.serializedSize(), searchIndex);
}
} catch (Exception e) {
logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
}
+ // make sure last wal file is correct
+ WALRecoverWriter walRecoverWriter = new WALRecoverWriter(lastWALFile);
+ try {
+ walRecoverWriter.recover(metaData);
+ } catch (IOException e) {
+ logger.error("Fail to recover metadata of wal file {}", lastWALFile);
+ }
// rename last wal file when file status are inconsistent
if (WALFileUtils.parseStatusCode(lastWALFile.getName()) != fileStatus) {
String targetName =
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
new file mode 100644
index 0000000000..92591f7837
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover;
+
+import org.apache.iotdb.db.wal.io.WALMetaData;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING;
+import static org.apache.iotdb.db.wal.io.WALWriter.MAGIC_STRING_BYTES;
+
+/** Check whether the wal file is broken and recover it. */
+public class WALRecoverWriter {
+ private final File logFile;
+
+ public WALRecoverWriter(File logFile) {
+ this.logFile = logFile;
+ }
+
+ public void recover(WALMetaData metaData) throws IOException {
+ if (!readTailMagic().equals(MAGIC_STRING)) {
+ // truncate broken data
+ int size = metaData.getBuffersSize().stream().mapToInt(Integer::intValue).sum();
+ try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
+ channel.truncate(size);
+ }
+ // flush metadata
+ try (WALWriter walWriter = new WALWriter(logFile)) {
+ walWriter.updateMetaData(metaData);
+ }
+ }
+ }
+
+ private String readTailMagic() throws IOException {
+ try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
+ ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
+ channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
index 12a8120013..a7eef37d48 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
@@ -85,7 +85,7 @@ public class InsertRowNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
- InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
+ InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
Assert.assertEquals(insertRowNode, tmpNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index b6604aedb3..9c761d0fbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -76,7 +76,7 @@ public class InsertTabletNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
- InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+ InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream);
tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
Assert.assertEquals(insertTabletNode, tmpNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 6e0bcedab4..c068e4d64b 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.exception.SystemCheckException;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.io.WALFileTest;
import org.apache.iotdb.db.wal.io.WALWriter;
@@ -86,9 +87,10 @@ public class WalCheckerTest {
walNodeDir, WALFileUtils.getLogFileName(i, 0, WALFileStatus.CONTAINS_SEARCH_INDEX));
int fakeMemTableId = 1;
List<WALEntry> walEntries = new ArrayList<>();
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
+ walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
+ walEntries.add(
+ new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
+ walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
int size = 0;
for (WALEntry walEntry : walEntries) {
size += walEntry.serializedSize();
@@ -124,9 +126,10 @@ public class WalCheckerTest {
walNodeDir, WALFileUtils.getLogFileName(i, 0, WALFileStatus.CONTAINS_SEARCH_INDEX));
int fakeMemTableId = 1;
List<WALEntry> walEntries = new ArrayList<>();
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
- walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
+ walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
+ walEntries.add(
+ new WALInfoEntry(fakeMemTableId, WALFileTest.getInsertTabletPlan(DEVICE_ID)));
+ walEntries.add(new WALInfoEntry(fakeMemTableId, WALFileTest.getDeletePlan(DEVICE_ID)));
int size = 0;
for (WALEntry walEntry : walEntries) {
size += walEntry.serializedSize();
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
index a803a410b6..c6b88c4492 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
@@ -114,7 +114,7 @@ public abstract class WALBufferCommonTest {
InsertRowPlan insertRowPlan = getInsertRowPlan(devicePath + memTableId, i);
expectedInsertRowPlans.add(insertRowPlan);
- WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
+ WALEntry walEntry = new WALInfoEntry(memTableId, insertRowPlan);
walBuffer.write(walEntry);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index 9b79403465..bf6460d830 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -75,11 +76,11 @@ public class WALFileTest {
public void testReadNormalFile() throws IOException, IllegalPathException {
int fakeMemTableId = 1;
List<WALEntry> expectedWALEntries = new ArrayList<>();
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getDeletePlan(devicePath)));
// test WALEntry.serializedSize
int size = 0;
for (WALEntry walEntry : expectedWALEntries) {
@@ -122,11 +123,11 @@ public class WALFileTest {
public void testReadBrokenFile() throws IOException, IllegalPathException {
int fakeMemTableId = 1;
List<WALEntry> expectedWALEntries = new ArrayList<>();
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
- expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
+ expectedWALEntries.add(new WALInfoEntry(fakeMemTableId, getDeletePlan(devicePath)));
// test WALEntry.serializedSize
int size = Byte.BYTES;
for (WALEntry walEntry : expectedWALEntries) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 3350894304..65ab61780c 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -21,19 +21,18 @@ package org.apache.iotdb.db.wal.node;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.utils.WALMode;
-import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -44,7 +43,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -127,170 +125,79 @@ public class ConsensusReqReaderTest {
// _6-5-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(6);
- WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
- walFlushListener.waitForResult();
- }
-
- @Test
- public void scenario01TestGetReq01() throws Exception {
- simulateFileScenario01();
-
- IConsensusRequest request;
- request = walNode.getReq(1);
- Assert.assertTrue(request instanceof InsertRowNode);
- Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
- request = walNode.getReq(2);
- Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
- Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
- Assert.assertEquals(
- 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
- request = walNode.getReq(3);
- Assert.assertTrue(request instanceof InsertRowsNode);
- Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
- Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
- request = walNode.getReq(4);
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
- request = walNode.getReq(5);
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
- request = walNode.getReq(6);
- Assert.assertNull(request);
- }
-
- @Test
- public void scenario01TestGetReqs01() throws Exception {
- simulateFileScenario01();
- List<IConsensusRequest> requests;
- IConsensusRequest request;
-
- requests = walNode.getReqs(1, 6);
- Assert.assertEquals(5, requests.size());
- request = requests.get(0);
- Assert.assertTrue(request instanceof InsertRowNode);
- Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
- request = requests.get(1);
- Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
- Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
- Assert.assertEquals(
- 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
- request = requests.get(2);
- Assert.assertTrue(request instanceof InsertRowsNode);
- Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
- Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
- request = requests.get(3);
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
- request = requests.get(4);
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
- }
-
- @Test
- public void scenario01TestGetReqs02() throws Exception {
- simulateFileScenario01();
- List<IConsensusRequest> requests;
- IConsensusRequest request;
-
- requests = walNode.getReqs(3, 1);
- Assert.assertEquals(1, requests.size());
- request = requests.get(0);
- Assert.assertTrue(request instanceof InsertRowsNode);
- Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
- Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
- }
-
- @Test
- public void scenario01TestGetReqs03() throws Exception {
- simulateFileScenario01();
- List<IConsensusRequest> requests;
- IConsensusRequest request;
-
- requests = walNode.getReqs(4, 2);
- Assert.assertEquals(2, requests.size());
- request = requests.get(0);
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
- request = requests.get(1);
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
- }
-
- @Test
- public void scenario01TestGetReqs04() throws Exception {
- simulateFileScenario01();
- List<IConsensusRequest> requests;
- IConsensusRequest request;
-
- requests = walNode.getReqs(5, 100);
- Assert.assertEquals(1, requests.size());
- request = requests.get(0);
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
- }
-
- @Test
- public void scenario01TestGetReqs05() throws Exception {
- simulateFileScenario01();
- List<IConsensusRequest> requests;
-
- requests = walNode.getReqs(6, 100);
- Assert.assertEquals(0, requests.size());
+ walNode.log(0, insertRowNode); // 6
}
@Test
public void scenario01TestGetReqIterator01() throws Exception {
simulateFileScenario01();
- IConsensusRequest request;
+ walNode.rollWALFile();
+
+ IndexedConsensusRequest request;
+ PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowNode);
- Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
- Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
- Assert.assertEquals(
- 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ request = iterator.next();
+ Assert.assertEquals(3, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowsNode);
- Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
- Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ request = iterator.next();
+ Assert.assertEquals(3, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = iterator.next();
+ Assert.assertEquals(4, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator02() throws Exception {
simulateFileScenario01();
- IConsensusRequest request;
+
+ IndexedConsensusRequest request;
+ PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
- Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-
+ request = iterator.next();
+ Assert.assertEquals(4, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertFalse(iterator.hasNext());
+
// wait for next
ExecutorService checkThread = Executors.newSingleThreadExecutor();
Future<Boolean> future =
@@ -298,14 +205,19 @@ public class ConsensusReqReaderTest {
() -> {
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
- IConsensusRequest req = iterator.next().getRequest();
- ;
- Assert.assertTrue(req instanceof InsertRowNode);
- Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+ IndexedConsensusRequest req = iterator.next();
+ Assert.assertEquals(1, req.getRequests().size());
+ for (IConsensusRequest innerRequest : req.getRequests()) {
+ PlanNode node =
+ WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(node instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) node).getSearchIndex());
+ }
return true;
});
Thread.sleep(500);
+ walNode.rollWALFile();
InsertRowNode insertRowNode = getInsertRowNode(devicePath);
walNode.log(0, insertRowNode); // put -1 after 6
Assert.assertTrue(future.get());
@@ -314,26 +226,35 @@ public class ConsensusReqReaderTest {
@Test
public void scenario01TestGetReqIterator03() throws Exception {
simulateFileScenario01();
- IConsensusRequest request;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
- Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
-
Assert.assertFalse(iterator.hasNext());
+
// wait for next
ExecutorService checkThread = Executors.newSingleThreadExecutor();
Future<Boolean> future =
checkThread.submit(
() -> {
+ iterator.waitForNextReady();
+ IndexedConsensusRequest request;
+ PlanNode planNode;
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+ }
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
- IConsensusRequest req = iterator.next().getRequest();
- ;
- Assert.assertTrue(req instanceof InsertRowNode);
- Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(6, ((InsertRowNode) planNode).getSearchIndex());
+ }
return true;
});
@@ -341,73 +262,125 @@ public class ConsensusReqReaderTest {
walNode.rollWALFile();
InsertRowNode insertRowNode = getInsertRowNode(devicePath);
walNode.log(0, insertRowNode); // put -1 after 6
+ walNode.rollWALFile();
+ insertRowNode = getInsertRowNode(devicePath);
+ walNode.log(0, insertRowNode); // put -1 after 6
Assert.assertTrue(future.get());
}
@Test
public void scenario01TestGetReqIterator04() throws Exception {
simulateFileScenario01();
- IConsensusRequest request;
+ walNode.rollWALFile();
+
+ IndexedConsensusRequest request;
+ PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowNode);
- Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
- Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
- Assert.assertEquals(
- 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ request = iterator.next();
+ Assert.assertEquals(3, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+ }
iterator.skipTo(4);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = iterator.next();
+ Assert.assertEquals(4, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator05() throws Exception {
simulateFileScenario01();
- IConsensusRequest request;
+ walNode.rollWALFile();
+
+ IndexedConsensusRequest request;
+ PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+ }
iterator.skipTo(2);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
- Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
- Assert.assertEquals(
- 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ request = iterator.next();
+ Assert.assertEquals(3, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertRowsNode);
- Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
- Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ request = iterator.next();
+ Assert.assertEquals(3, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertRowNode);
+ Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertMultiTabletsNode);
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
- Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = iterator.next();
+ Assert.assertEquals(4, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
+ }
Assert.assertTrue(iterator.hasNext());
- request = iterator.next().getRequest();
- Assert.assertTrue(request instanceof InsertTabletNode);
- Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ request = iterator.next();
+ Assert.assertEquals(1, request.getRequests().size());
+ for (IConsensusRequest innerRequest : request.getRequests()) {
+ planNode = WALEntry.deserializeInsertNode(innerRequest.serializeToByteBuffer());
+ Assert.assertTrue(planNode instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
+ }
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator06() throws Exception {
+ simulateFileScenario01();
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator07() throws Exception {
+ simulateFileScenario01();
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6);
Assert.assertFalse(iterator.hasNext());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
index 7757d171df..049a8a3c5f 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/WALRecoverManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.buffer.IWALBuffer;
import org.apache.iotdb.db.wal.buffer.WALBuffer;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -178,7 +179,7 @@ public class WALRecoverManagerTest {
try {
while (walBuffer.getCurrentWALFileVersion() - firstWALVersionId < 2) {
WALEntry walEntry =
- new WALEntry(
+ new WALInfoEntry(
memTableId, getInsertTabletPlan(SG_NAME.concat("test_d" + memTableId)));
walBuffer.write(walEntry);
}
@@ -204,7 +205,7 @@ public class WALRecoverManagerTest {
long firstValidVersionId = walBuffer.getCurrentWALFileVersion();
IMemTable targetMemTable = new PrimitiveMemTable();
WALEntry walEntry =
- new WALEntry(targetMemTable.getMemTableId(), getInsertRowPlan(DEVICE2_NAME, 4L), true);
+ new WALInfoEntry(targetMemTable.getMemTableId(), getInsertRowPlan(DEVICE2_NAME, 4L), true);
walBuffer.write(walEntry);
walEntry.getWalFlushListener().waitForResult();
// write .checkpoint file
@@ -236,7 +237,7 @@ public class WALRecoverManagerTest {
try {
while (walBuffer.getCurrentWALFileVersion() - firstWALVersionId < 2) {
WALEntry walEntry =
- new WALEntry(
+ new WALInfoEntry(
memTableId, getInsertTabletPlan(SG_NAME.concat("test_d" + memTableId)));
walBuffer.write(walEntry);
}
@@ -264,11 +265,11 @@ public class WALRecoverManagerTest {
InsertRowPlan insertRowPlan = getInsertRowPlan(DEVICE2_NAME, 4L);
targetMemTable.insert(insertRowPlan);
- WALEntry walEntry = new WALEntry(targetMemTable.getMemTableId(), insertRowPlan, true);
+ WALEntry walEntry = new WALInfoEntry(targetMemTable.getMemTableId(), insertRowPlan, true);
walBuffer.write(walEntry);
walEntry.getWalFlushListener().waitForResult();
- walEntry = new WALEntry(targetMemTable.getMemTableId(), targetMemTable, true);
+ walEntry = new WALInfoEntry(targetMemTable.getMemTableId(), targetMemTable, true);
walBuffer.write(walEntry);
walEntry.getWalFlushListener().waitForResult();
// write .checkpoint file
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index f5ff449761..d6ebf64d70 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.utils.TsFileUtilsForRecoverTest;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -126,7 +127,7 @@ public class UnsealedTsFileRecoverPerformerTest {
new InsertRowPlan(
new PartialPath(DEVICE2_NAME), time, new String[] {"s1", "s2"}, dataTypes, columns);
int fakeMemTableId = 1;
- WALEntry walEntry = new WALEntry(fakeMemTableId, insertRowPlan);
+ WALEntry walEntry = new WALInfoEntry(fakeMemTableId, insertRowPlan);
// recover
tsFileResource = new TsFileResource(file);
// vsg processor is used to test IdTable, don't test IdTable here
@@ -181,7 +182,7 @@ public class UnsealedTsFileRecoverPerformerTest {
DeletePlan deletePlan =
new DeletePlan(Long.MIN_VALUE, Long.MAX_VALUE, new PartialPath(DEVICE2_NAME));
int fakeMemTableId = 1;
- WALEntry walEntry = new WALEntry(fakeMemTableId, deletePlan);
+ WALEntry walEntry = new WALInfoEntry(fakeMemTableId, deletePlan);
// recover
tsFileResource = new TsFileResource(file);
// vsg processor is used to test IdTable, don't test IdTable here
@@ -306,8 +307,8 @@ public class UnsealedTsFileRecoverPerformerTest {
insertTabletNode.markFailedMeasurement(0, new Exception());
int fakeMemTableId = 1;
- WALEntry walEntry1 = new WALEntry(fakeMemTableId++, insertRowNode);
- WALEntry walEntry2 = new WALEntry(fakeMemTableId, insertTabletNode);
+ WALEntry walEntry1 = new WALInfoEntry(fakeMemTableId++, insertRowNode);
+ WALEntry walEntry2 = new WALInfoEntry(fakeMemTableId, insertTabletNode);
// recover
tsFileResource = new TsFileResource(file);
// vsg processor is used to test IdTable, don't test IdTable here
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
index a46f783e73..765ed1a03e 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALByteBufferForTest.java
@@ -69,6 +69,11 @@ public class WALByteBufferForTest implements IWALByteBufferView {
buffer.putDouble(value);
}
+ @Override
+ public int position() {
+ return buffer.position();
+ }
+
public ByteBuffer getBuffer() {
return buffer;
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 5e2836c5a6..d36601be1c 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -22,6 +22,7 @@ namespace java org.apache.iotdb.consensus.multileader.thrift
struct TLogBatch {
1: required binary data
+ 2: required bool fromWAL
}
struct TSyncLogReq {