You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/01 02:37:36 UTC
[iotdb] branch master updated: Fix the issues in MultiLeader log dispatcher (#6484)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new deb9732b43 Fix the issues in MultiLeader log dispatcher (#6484)
deb9732b43 is described below
commit deb9732b43c6f6dbb4e803bdc66a89c0f2911d1c
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Fri Jul 1 10:37:31 2022 +0800
Fix the issues in MultiLeader log dispatcher (#6484)
---
.../multileader/MultiLeaderServerImpl.java | 42 ++++-
.../multileader/logdispatcher/IndexController.java | 67 ++++---
.../multileader/logdispatcher/LogDispatcher.java | 80 ++++++--
.../multileader/wal/ConsensusReqReader.java | 6 +-
.../multileader/MultiLeaderConsensusTest.java | 209 ++-------------------
.../iotdb/consensus/multileader/RecoveryTest.java | 4 +-
.../logdispatcher/IndexControllerTest.java | 38 +---
.../multileader/logdispatcher/SyncStatusTest.java | 8 +-
.../multileader/util/FakeConsensusReqReader.java | 111 +++++++++++
.../consensus/multileader/util/RequestSets.java | 55 ++++++
.../consensus/multileader/util/TestEntry.java | 74 ++++++++
.../multileader/util/TestStateMachine.java | 93 +++++++++
.../java/org/apache/iotdb/db/tools/WalChecker.java | 15 +-
.../iotdb/db/wal/buffer/AbstractWALBuffer.java | 5 +
.../org/apache/iotdb/db/wal/buffer/IWALBuffer.java | 3 +
.../org/apache/iotdb/db/wal/node/WALFakeNode.java | 5 +
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 66 +++----
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 40 ++--
18 files changed, 574 insertions(+), 347 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 272c244a1e..93f9cf567b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
-import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
@@ -46,6 +46,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class MultiLeaderServerImpl {
@@ -57,7 +58,7 @@ public class MultiLeaderServerImpl {
private final IStateMachine stateMachine;
private final String storageDir;
private final List<Peer> configuration;
- private final IndexController controller;
+ private final AtomicLong index;
private final LogDispatcher logDispatcher;
private final MultiLeaderConfig config;
@@ -71,8 +72,6 @@ public class MultiLeaderServerImpl {
this.storageDir = storageDir;
this.thisNode = thisNode;
this.stateMachine = stateMachine;
- this.controller =
- new IndexController(storageDir, Utils.fromTEndPointToString(thisNode.getEndpoint()), true);
this.configuration = configuration;
if (configuration.isEmpty()) {
recoverConfiguration();
@@ -81,6 +80,11 @@ public class MultiLeaderServerImpl {
}
this.config = config;
this.logDispatcher = new LogDispatcher(this, clientManager);
+ // restart
+ ConsensusReqReader reader =
+ (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
+ long currentSearchIndex = reader.getCurrentSearchIndex();
+ this.index = new AtomicLong(currentSearchIndex);
}
public IStateMachine getStateMachine() {
@@ -104,8 +108,26 @@ public class MultiLeaderServerImpl {
synchronized (stateMachine) {
IndexedConsensusRequest indexedConsensusRequest =
buildIndexedConsensusRequestForLocalRequest(request);
+ if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
+ logger.info(
+ "DataRegion[{}]: index after build: safeIndex: {}, searchIndex: {}",
+ thisNode.getGroupId(),
+ indexedConsensusRequest.getSafelyDeletedSearchIndex(),
+ indexedConsensusRequest.getSearchIndex());
+ }
+ // TODO wal and memtable
TSStatus result = stateMachine.write(indexedConsensusRequest);
- logDispatcher.offer(indexedConsensusRequest);
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logDispatcher.offer(indexedConsensusRequest);
+ } else {
+ logger.debug(
+ "{}: write operation failed. searchIndex: {}. Code: {}",
+ thisNode.getGroupId(),
+ indexedConsensusRequest.getSearchIndex(),
+ result.getCode());
+ index.decrementAndGet();
+ }
+
return result;
}
}
@@ -156,7 +178,7 @@ public class MultiLeaderServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
return new IndexedConsensusRequest(
- controller.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), request);
+ index.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), request);
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
@@ -170,7 +192,7 @@ public class MultiLeaderServerImpl {
* single copies, the current index is selected
*/
public long getCurrentSafelyDeletedSearchIndex() {
- return logDispatcher.getMinSyncIndex().orElseGet(controller::getCurrentIndex);
+ return logDispatcher.getMinSyncIndex().orElseGet(index::get);
}
public String getStorageDir() {
@@ -185,8 +207,8 @@ public class MultiLeaderServerImpl {
return configuration;
}
- public IndexController getController() {
- return controller;
+ public long getIndex() {
+ return index.get();
}
public MultiLeaderConfig getConfig() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 6a7396076d..064f6841e0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/** An index controller class to balance the performance degradation of frequent disk I/O. */
@ThreadSafe
@@ -39,42 +40,56 @@ public class IndexController {
public static final int FLUSH_INTERVAL = 500;
- private volatile long lastFlushedIndex;
- private volatile long currentIndex;
+ private long lastFlushedIndex;
+ private long currentIndex;
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final String storageDir;
private final String prefix;
- // Indicates whether currentIndex needs to be incremented by FLUSH_INTERVAL interval after restart
- private final boolean incrementIntervalAfterRestart;
- public IndexController(String storageDir, String prefix, boolean incrementIntervalAfterRestart) {
+ public IndexController(String storageDir, String prefix) {
this.storageDir = storageDir;
this.prefix = prefix + '-';
- this.incrementIntervalAfterRestart = incrementIntervalAfterRestart;
restore();
}
- public synchronized long incrementAndGet() {
- currentIndex++;
- checkPersist();
- return currentIndex;
+ public long incrementAndGet() {
+ try {
+ lock.writeLock().lock();
+ currentIndex++;
+ checkPersist();
+ return currentIndex;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
- public synchronized long updateAndGet(long index) {
- long newCurrentIndex = Math.max(currentIndex, index);
- logger.debug(
- "update index from currentIndex {} to {} for file prefix {} in {}",
- currentIndex,
- newCurrentIndex,
- prefix,
- storageDir);
- currentIndex = newCurrentIndex;
- checkPersist();
- return currentIndex;
+ public long updateAndGet(long index) {
+ try {
+ lock.writeLock().lock();
+ long newCurrentIndex = Math.max(currentIndex, index);
+ logger.debug(
+ "update index from currentIndex {} to {} for file prefix {} in {}",
+ currentIndex,
+ newCurrentIndex,
+ prefix,
+ storageDir);
+ currentIndex = newCurrentIndex;
+ checkPersist();
+ return currentIndex;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
public long getCurrentIndex() {
- return currentIndex;
+ try {
+ lock.readLock().lock();
+ return currentIndex;
+ } finally {
+ lock.readLock().unlock();
+ }
}
@TestOnly
@@ -131,13 +146,7 @@ public class IndexController {
}
}
}
- if (incrementIntervalAfterRestart) {
- // prevent overlapping in case of failure
- currentIndex = lastFlushedIndex + FLUSH_INTERVAL;
- persist();
- } else {
- currentIndex = lastFlushedIndex;
- }
+ currentIndex = lastFlushedIndex;
} else {
versionFile = new File(directory, prefix + "0");
try {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1d651a0ba5..ae60598061 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -122,7 +121,7 @@ public class LogDispatcher {
}
public class LogDispatcherThread implements Runnable {
-
+ private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
private final MultiLeaderConfig config;
private final Peer peer;
private final IndexController controller;
@@ -137,6 +136,9 @@ public class LogDispatcher {
(ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
private volatile boolean stopped = false;
+ private ConsensusReqReader.ReqIterator walEntryiterator;
+ private long iteratorIndex = 1;
+
public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
this.peer = peer;
this.config = config;
@@ -144,8 +146,9 @@ public class LogDispatcher {
new ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
this.controller =
new IndexController(
- impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()), false);
+ impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()));
this.syncStatus = new SyncStatus(controller, config);
+ this.walEntryiterator = reader.getReqIterator(iteratorIndex);
}
public IndexController getController() {
@@ -184,10 +187,14 @@ public class LogDispatcher {
while (!Thread.interrupted() && !stopped) {
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue
- bufferedRequest.add(pendingRequest.take());
- // If write pressure is low, we simply sleep a little to reduce the number of RPC
- if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
- Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
+ IndexedConsensusRequest request =
+ pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
+ if (request != null) {
+ bufferedRequest.add(request);
+ // If write pressure is low, we simply sleep a little to reduce the number of RPC
+ if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+ Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
+ }
}
}
// we may block here if the synchronization pipeline is full
@@ -207,20 +214,36 @@ public class LogDispatcher {
PendingBatch batch;
List<TLogBatch> logBatches = new ArrayList<>();
long startIndex = syncStatus.getNextSendingIndex();
- long maxIndex = impl.getController().getCurrentIndex() + 1;
+ logger.debug("get batch. startIndex: {}", startIndex);
long endIndex;
if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
// Use drainTo instead of poll to reduce lock overhead
+ logger.debug(
+ "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+ impl.getThisNode().getGroupId(),
+ pendingRequest.size(),
+ bufferedRequest.size());
pendingRequest.drainTo(
bufferedRequest,
config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+ // remove all request that searchIndex < startIndex
+ Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ while (iterator.hasNext()) {
+ IndexedConsensusRequest request = iterator.next();
+ if (request.getSearchIndex() < startIndex) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
}
- if (bufferedRequest.isEmpty()) {
- // only execute this after a restart
- endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches);
+ if (bufferedRequest.isEmpty()) { // only execute this after a restart
+ endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
+ logger.debug(
+ "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
} else {
+ // Notice that prev searchIndex >= startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
// Prevents gap between logs. For example, some requests are not written into the queue when
@@ -245,7 +268,7 @@ public class LogDispatcher {
if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
- "{} : accumulated a {} from queue and wal",
+ "gap {} : accumulated a {} from queue and wal when gap",
impl.getThisNode().getGroupId(),
batch);
return batch;
@@ -283,15 +306,36 @@ public class LogDispatcher {
private long constructBatchFromWAL(
long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+ logger.debug(
+ String.format(
+ "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d, iteratorIndex: %d",
+ peer.getGroupId().getId(),
+ peer.getEndpoint().ip,
+ currentIndex,
+ maxIndex,
+ iteratorIndex));
+ if (iteratorIndex != currentIndex) {
+ walEntryiterator.skipTo(currentIndex);
+ iteratorIndex = currentIndex;
+ }
while (currentIndex < maxIndex
&& logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
- // TODO iterator
- IConsensusRequest data = reader.getReq(currentIndex++);
- if (data != null) {
- logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
+ try {
+ walEntryiterator.waitForNextReady();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("wait for next WAL entry is interrupted");
+ }
+ IndexedConsensusRequest data = walEntryiterator.next();
+ currentIndex = data.getSearchIndex();
+ iteratorIndex = currentIndex;
+ logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ if (currentIndex == maxIndex - 1) {
+ break;
}
}
- return currentIndex - 1;
+ return currentIndex;
}
private void constructBatchIndexedFromConsensusRequest(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index 5880e5de2d..a39e3186a4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.multileader.wal;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import java.util.Iterator;
import java.util.List;
@@ -68,7 +69,7 @@ public interface ConsensusReqReader {
* @throws java.util.NoSuchElementException if the iteration has no more elements, wait a moment
* or call {@link this#waitForNextReady} for more elements
*/
- IConsensusRequest next();
+ IndexedConsensusRequest next();
/**
* Wait for the next element in the iteration ready, blocked until next element is available.
@@ -89,4 +90,7 @@ public interface ConsensusReqReader {
*/
void skipTo(long targetIndex);
}
+
+ /** Get current search index */
+ long getCurrentSearchIndex();
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index 320d4d3d27..771b600c69 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -20,22 +20,15 @@
package org.apache.iotdb.consensus.multileader;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.ConsensusGroup;
-import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.consensus.multileader.util.TestEntry;
+import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -45,17 +38,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
public class MultiLeaderConsensusTest {
@@ -133,29 +120,19 @@ public class MultiLeaderConsensusTest {
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
servers.get(2).addConsensusGroup(group.getGroupId(), group.getPeers());
- Assert.assertEquals(0, servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(0, servers.get(1).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(0, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
servers.get(2).write(gid, new TestEntry(i, peers.get(2)));
- Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(i + 1, servers.get(2).getImpl(gid).getIndex());
}
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(0).getImpl(gid).getController().getLastFlushedIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(1).getImpl(gid).getController().getLastFlushedIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(2).getImpl(gid).getController().getLastFlushedIndex());
-
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
@@ -193,15 +170,9 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
- servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
- servers.get(1).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
- servers.get(2).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(2).getImpl(gid).getIndex());
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
@@ -217,7 +188,7 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(
IndexController.FLUSH_INTERVAL,
- servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(
IndexController.FLUSH_INTERVAL,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -237,23 +208,16 @@ public class MultiLeaderConsensusTest {
servers.get(0).addConsensusGroup(group.getGroupId(), group.getPeers());
servers.get(1).addConsensusGroup(group.getGroupId(), group.getPeers());
- Assert.assertEquals(0, servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(0, servers.get(1).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(0, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(1).getImpl(gid).getIndex());
for (int i = 0; i < IndexController.FLUSH_INTERVAL; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
servers.get(1).write(gid, new TestEntry(i, peers.get(1)));
- Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(i + 1, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getIndex());
}
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(0).getImpl(gid).getController().getLastFlushedIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL,
- servers.get(1).getImpl(gid).getController().getLastFlushedIndex());
-
Assert.assertEquals(0, servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
Assert.assertEquals(0, servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
@@ -266,13 +230,9 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
Assert.assertEquals(peers, servers.get(2).getImpl(gid).getConfiguration());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
- servers.get(0).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(
- IndexController.FLUSH_INTERVAL * 2,
- servers.get(1).getImpl(gid).getController().getCurrentIndex());
- Assert.assertEquals(0, servers.get(2).getImpl(gid).getController().getCurrentIndex());
+ Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(0).getImpl(gid).getIndex());
+ Assert.assertEquals(IndexController.FLUSH_INTERVAL, servers.get(1).getImpl(gid).getIndex());
+ Assert.assertEquals(0, servers.get(2).getImpl(gid).getIndex());
for (int i = 0; i < 2; i++) {
long start = System.currentTimeMillis();
@@ -299,135 +259,4 @@ public class MultiLeaderConsensusTest {
Assert.assertEquals(stateMachines.get(0).getData(), stateMachines.get(1).getData());
Assert.assertEquals(stateMachines.get(2).getData(), stateMachines.get(1).getData());
}
-
- private static class TestEntry implements IConsensusRequest {
-
- private final int num;
- private final Peer peer;
-
- public TestEntry(int num, Peer peer) {
- this.num = num;
- this.peer = peer;
- }
-
- @Override
- public ByteBuffer serializeToByteBuffer() {
- try (PublicBAOS publicBAOS = new PublicBAOS();
- DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- outputStream.writeInt(num);
- peer.serialize(outputStream);
- return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TestEntry testEntry = (TestEntry) o;
- return num == testEntry.num && Objects.equals(peer, testEntry.peer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(num, peer);
- }
-
- @Override
- public String toString() {
- return "TestEntry{" + "num=" + num + ", peer=" + peer + '}';
- }
- }
-
- private static class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
-
- private final Set<IndexedConsensusRequest> requestSet = ConcurrentHashMap.newKeySet();
-
- public Set<IndexedConsensusRequest> getRequestSet() {
- return requestSet;
- }
-
- public Set<TestEntry> getData() {
- Set<TestEntry> data = new HashSet<>();
- requestSet.forEach(x -> data.add((TestEntry) x.getRequest()));
- return data;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void stop() {}
-
- @Override
- public TSStatus write(IConsensusRequest request) {
- synchronized (requestSet) {
- IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
- if (innerRequest instanceof ByteBufferConsensusRequest) {
- ByteBuffer buffer = innerRequest.serializeToByteBuffer();
- requestSet.add(
- new IndexedConsensusRequest(
- ((IndexedConsensusRequest) request).getSearchIndex(),
- -1,
- new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
- } else {
- requestSet.add(((IndexedConsensusRequest) request));
- }
- return new TSStatus();
- }
- }
-
- @Override
- public synchronized DataSet read(IConsensusRequest request) {
- if (request instanceof GetConsensusReqReaderPlan) {
- return new FakeConsensusReqReader(requestSet);
- }
- return null;
- }
-
- @Override
- public boolean takeSnapshot(File snapshotDir) {
- return false;
- }
-
- @Override
- public void loadSnapshot(File latestSnapshotRootDir) {}
- }
-
- public static class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
-
- private final Set<IndexedConsensusRequest> requestSet;
-
- public FakeConsensusReqReader(Set<IndexedConsensusRequest> requestSet) {
- this.requestSet = requestSet;
- }
-
- @Override
- public IConsensusRequest getReq(long index) {
- synchronized (requestSet) {
- for (IndexedConsensusRequest indexedConsensusRequest : requestSet) {
- if (indexedConsensusRequest.getSearchIndex() == index) {
- return indexedConsensusRequest;
- }
- }
- return null;
- }
- }
-
- @Override
- public List<IConsensusRequest> getReqs(long startIndex, int num) {
- return null;
- }
-
- @Override
- public ReqIterator getReqIterator(long startIndex) {
- return null;
- }
- }
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
index b9000b7337..fb28a78058 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/RecoveryTest.java
@@ -23,11 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.EmptyStateMachine;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.multileader.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -52,7 +52,7 @@ public class RecoveryTest {
.setThisNode(new TEndPoint("0.0.0.0", 9000))
.setStorageDir("target" + java.io.File.separator + "recovery")
.build(),
- gid -> new EmptyStateMachine())
+ gid -> new TestStateMachine())
.orElseThrow(
() ->
new IllegalArgumentException(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index c9bf6051b0..aa6f63783f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -43,38 +43,10 @@ public class IndexControllerTest {
FileUtils.deleteFully(storageDir);
}
- /** test indexController when incrementIntervalAfterRestart == true */
- @Test
- public void testTrueIncrementIntervalAfterRestart() {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
- Assert.assertEquals(0, controller.getCurrentIndex());
- Assert.assertEquals(0, controller.getLastFlushedIndex());
-
- for (int i = 0; i < IndexController.FLUSH_INTERVAL - 1; i++) {
- controller.incrementAndGet();
- }
- Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, controller.getCurrentIndex());
- Assert.assertEquals(0, controller.getLastFlushedIndex());
-
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
-
- for (int i = 0; i < IndexController.FLUSH_INTERVAL + 1; i++) {
- controller.incrementAndGet();
- }
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 + 1, controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2, controller.getLastFlushedIndex());
-
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 3, controller.getCurrentIndex());
- Assert.assertEquals(IndexController.FLUSH_INTERVAL * 3, controller.getLastFlushedIndex());
- }
-
/** test indexController when incrementIntervalAfterRestart == false */
@Test
- public void testFalseIncrementIntervalAfterRestart() {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+ public void testIncrementIntervalAfterRestart() {
+ IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -83,7 +55,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -91,7 +63,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1, controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
@@ -99,7 +71,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1, controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, false);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL, controller.getLastFlushedIndex());
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index cbf015257e..d243a80e77 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -54,7 +54,7 @@ public class SyncStatusTest {
/** Confirm success from front to back */
@Test
public void sequenceTest() throws InterruptedException {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+ IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +79,7 @@ public class SyncStatusTest {
/** Confirm success from back to front */
@Test
public void reverseTest() throws InterruptedException {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+ IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -111,7 +111,7 @@ public class SyncStatusTest {
/** Confirm success first from front to back, then back to front */
@Test
public void mixedTest() throws InterruptedException {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+ IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -153,7 +153,7 @@ public class SyncStatusTest {
/** Test Blocking while addNextBatch */
@Test
public void waitTest() throws InterruptedException, ExecutionException {
- IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix, true);
+ IndexController controller = new IndexController(storageDir.getAbsolutePath(), prefix);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
new file mode 100644
index 0000000000..01d90b62cc
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
+
+ private final RequestSets requestSets;
+
+ public FakeConsensusReqReader(RequestSets requestSets) {
+ this.requestSets = requestSets;
+ }
+
+ @Override
+ public IConsensusRequest getReq(long index) {
+ synchronized (requestSets) {
+ for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
+ if (indexedConsensusRequest.getSearchIndex() == index) {
+ return indexedConsensusRequest;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public List<IConsensusRequest> getReqs(long startIndex, int num) {
+ return null;
+ }
+
+ @Override
+ public ReqIterator getReqIterator(long startIndex) {
+ return new FakeConsensusReqIterator(startIndex);
+ }
+
+ @Override
+ public long getCurrentSearchIndex() {
+ return requestSets.getLocalRequestNumber();
+ }
+
+ private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
+ private long nextSearchIndex;
+
+ public FakeConsensusReqIterator(long startIndex) {
+ this.nextSearchIndex = startIndex;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public IndexedConsensusRequest next() {
+ synchronized (requestSets) {
+ for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
+ if (indexedConsensusRequest.getSearchIndex() == nextSearchIndex) {
+ nextSearchIndex++;
+ return indexedConsensusRequest;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void waitForNextReady() throws InterruptedException {
+ while (!hasNext()) {
+ requestSets.waitForNextReady();
+ }
+ }
+
+ @Override
+ public void waitForNextReady(long time, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ while (!hasNext()) {
+ requestSets.waitForNextReady(time, unit);
+ }
+ }
+
+ @Override
+ public void skipTo(long targetIndex) {
+ nextSearchIndex = targetIndex;
+ }
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
new file mode 100644
index 0000000000..5b16573280
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/RequestSets.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class RequestSets {
+ private final Set<IndexedConsensusRequest> requestSet;
+ private long localRequestNumber = 0;
+
+ public RequestSets(Set<IndexedConsensusRequest> requests) {
+ this.requestSet = requests;
+ }
+
+ public void add(IndexedConsensusRequest request, boolean local) {
+ if (local) {
+ localRequestNumber++;
+ }
+ requestSet.add(request);
+ }
+
+ public Set<IndexedConsensusRequest> getRequestSet() {
+ return requestSet;
+ }
+
+ public void waitForNextReady() throws InterruptedException {}
+
+ public boolean waitForNextReady(long time, TimeUnit unit) throws InterruptedException {
+ return true;
+ }
+
+ public long getLocalRequestNumber() {
+ return localRequestNumber;
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
new file mode 100644
index 0000000000..214af5a8d6
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestEntry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class TestEntry implements IConsensusRequest {
+
+ private final int num;
+ private final Peer peer;
+
+ public TestEntry(int num, Peer peer) {
+ this.num = num;
+ this.peer = peer;
+ }
+
+ @Override
+ public ByteBuffer serializeToByteBuffer() {
+ try (PublicBAOS publicBAOS = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+ outputStream.writeInt(num);
+ peer.serialize(outputStream);
+ return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestEntry testEntry = (TestEntry) o;
+ return num == testEntry.num && Objects.equals(peer, testEntry.peer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(num, peer);
+ }
+
+ @Override
+ public String toString() {
+ return "TestEntry{" + "num=" + num + ", peer=" + peer + '}';
+ }
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
new file mode 100644
index 0000000000..a7803ba689
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.util;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
+
+ private final RequestSets requestSets = new RequestSets(ConcurrentHashMap.newKeySet());
+
+ public Set<IndexedConsensusRequest> getRequestSet() {
+ return requestSets.getRequestSet();
+ }
+
+ public Set<TestEntry> getData() {
+ Set<TestEntry> data = new HashSet<>();
+ requestSets.getRequestSet().forEach(x -> data.add((TestEntry) x.getRequest()));
+ return data;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus write(IConsensusRequest request) {
+ synchronized (requestSets) {
+ IConsensusRequest innerRequest = ((IndexedConsensusRequest) request).getRequest();
+ if (innerRequest instanceof ByteBufferConsensusRequest) {
+ ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+ requestSets.add(
+ new IndexedConsensusRequest(
+ ((IndexedConsensusRequest) request).getSearchIndex(),
+ -1,
+ new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
+ false);
+ } else {
+ requestSets.add(((IndexedConsensusRequest) request), true);
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ }
+
+ @Override
+ public synchronized DataSet read(IConsensusRequest request) {
+ if (request instanceof GetConsensusReqReaderPlan) {
+ return new FakeConsensusReqReader(requestSets);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean takeSnapshot(File snapshotDir) {
+ return false;
+ }
+
+ @Override
+ public void loadSnapshot(File latestSnapshotRootDir) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index f982aae06b..9c4da773e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.tools;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.exception.SystemCheckException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -35,6 +36,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -93,6 +95,12 @@ public class WalChecker {
while (logStream.available() > 0) {
WALEntry walEntry = WALEntry.deserialize(logStream);
totalSize += walEntry.serializedSize();
+ if (walEntry.getValue() instanceof InsertTabletNode) {
+ InsertTabletNode insertNode = (InsertTabletNode) walEntry.getValue();
+ System.err.printf(
+ "searchIndex: %s, timestamp: %s%n",
+ insertNode.getSearchIndex(), Arrays.toString(insertNode.getTimes()));
+ }
}
} catch (EOFException e) {
if (totalSize == walFile.length()) {
@@ -122,12 +130,7 @@ public class WalChecker {
/** @param args walRootDirectory */
public static void main(String[] args) throws SystemCheckException {
- if (args.length < 1) {
- logger.error("No enough args: require the walRootDirectory");
- return;
- }
-
- WalChecker checker = new WalChecker(args[0]);
+ WalChecker checker = new WalChecker("/Users/heimingz/Desktop/0");
List<File> files = checker.doCheck();
report(files);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 51d0ff33ae..e73f053469 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -103,4 +103,9 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
currentWALFileWriter = new WALWriter(nextLogFile);
logger.debug("Open new wal file {} for wal node-{}'s buffer.", nextLogFile, identifier);
}
+
+ @Override
+ public long getCurrentSearchIndex() {
+ return currentSearchIndex;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
index b5a43b0cd0..ca76b89fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
@@ -40,6 +40,9 @@ public interface IWALBuffer extends AutoCloseable {
/** Get current wal file's size */
long getCurrentWALFileSize();
+ /** Get current search index */
+ long getCurrentSearchIndex();
+
@Override
void close();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 507d02622a..54a1ba09a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -119,6 +119,11 @@ public class WALFakeNode implements IWALNode {
// do nothing
}
+ @Override
+ public long getCurrentSearchIndex() {
+ throw new UnsupportedOperationException();
+ }
+
public static WALFakeNode getFailureInstance(Exception e) {
return new WALFakeNode(
WALFlushListener.Status.FAILURE,
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 6827a97de2..1709ee22ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -77,7 +78,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
-import static org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
/**
* This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. If search is enabled,
@@ -135,8 +135,7 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
- if (insertRowNode.getSearchIndex() != NO_CONSENSUS_INDEX
- && insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ if (insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
}
WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
@@ -153,8 +152,7 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(
long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
- if (insertTabletNode.getSearchIndex() != NO_CONSENSUS_INDEX
- && insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ if (insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
safelyDeletedSearchIndex = insertTabletNode.getSafelyDeletedSearchIndex();
}
WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
@@ -237,7 +235,7 @@ public class WALNode implements IWALNode {
}
}
- logger.debug(
+ logger.info(
"Start deleting outdated wal files for wal node-{}, the first valid version id is {}, and the safely deleted search index is {}.",
identifier,
firstValidVersionId,
@@ -246,11 +244,6 @@ public class WALNode implements IWALNode {
// delete outdated files
deleteOutdatedFiles();
- // wal is used to search, cannot optimize files deletion
- if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
- return;
- }
-
// calculate effective information ratio
long costOfActiveMemTables = checkpointManager.getTotalCostOfActiveMemTables();
long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
@@ -276,6 +269,10 @@ public class WALNode implements IWALNode {
identifier,
config.getWalMinEffectiveInfoRatio());
if (snapshotOrFlushMemTable() && recursionTime < MAX_RECURSION_TIME) {
+ // wal is used to search, cannot optimize files deletion
+ if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ return;
+ }
run();
recursionTime++;
}
@@ -291,8 +288,10 @@ public class WALNode implements IWALNode {
}
// delete files whose content's search index are all <= safelyDeletedSearchIndex
WALFileUtils.ascSortByVersionId(filesToDelete);
+ // judge DEFAULT_SAFELY_DELETED_SEARCH_INDEX for standalone, Long.MIN_VALUE for multi-leader
int endFileIndex =
safelyDeletedSearchIndex == DEFAULT_SAFELY_DELETED_SEARCH_INDEX
+ || safelyDeletedSearchIndex == Long.MIN_VALUE
? filesToDelete.length
: WALFileUtils.binarySearchFileBySearchIndex(
filesToDelete, safelyDeletedSearchIndex + 1);
@@ -670,11 +669,6 @@ public class WALNode implements IWALNode {
private int currentFileIndex = -1;
/** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
private boolean needUpdatingFilesToSearch = true;
- /**
- * files whose version id before this value have already been searched, avoid storing too many
- * files in filesToSearch
- */
- private long searchedFilesVersionId = 0;
/** batch store insert nodes */
private final List<InsertNode> insertNodes = new LinkedList<>();
/** iterator of insertNodes */
@@ -698,6 +692,17 @@ public class WALNode implements IWALNode {
if (needUpdatingFilesToSearch || filesToSearch == null) {
updateFilesToSearch();
if (needUpdatingFilesToSearch) {
+ logger.warn("update file to search failed");
+ return false;
+ }
+ }
+
+ // find file contains search index
+ while (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ currentFileIndex++;
+ if (currentFileIndex >= filesToSearch.length) {
+ needUpdatingFilesToSearch = true;
return false;
}
}
@@ -807,9 +812,6 @@ public class WALNode implements IWALNode {
// update file index and version id
if (currentFileIndex >= filesToSearch.length) {
needUpdatingFilesToSearch = true;
- } else {
- searchedFilesVersionId =
- WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
}
// update iterator
@@ -821,7 +823,7 @@ public class WALNode implements IWALNode {
}
@Override
- public IConsensusRequest next() {
+ public IndexedConsensusRequest next() {
if (itr == null && !hasNext()) {
throw new NoSuchElementException();
}
@@ -846,7 +848,7 @@ public class WALNode implements IWALNode {
String.format("Search index of wal node-%s are out of order", identifier));
}
- return insertNode;
+ return new IndexedConsensusRequest(insertNode.getSearchIndex(), -1, insertNode);
}
@Override
@@ -882,7 +884,6 @@ public class WALNode implements IWALNode {
/** Reset all params except nextSearchIndex */
private void reset() {
- searchedFilesVersionId = -1;
insertNodes.clear();
itr = null;
filesToSearch = null;
@@ -891,14 +892,14 @@ public class WALNode implements IWALNode {
}
private void updateFilesToSearch() {
- File[] filesToSearch = logDirectory.listFiles(this::filterFilesToSearch);
+ File[] filesToSearch = WALFileUtils.listAllWALFiles(logDirectory);
WALFileUtils.ascSortByVersionId(filesToSearch);
int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
+ logger.debug(
+ "searchIndex: {}, result: {}, files: {}, ", nextSearchIndex, fileIndex, filesToSearch);
if (filesToSearch != null && fileIndex >= 0) { // possible to find next
this.filesToSearch = filesToSearch;
this.currentFileIndex = fileIndex;
- this.searchedFilesVersionId =
- WALFileUtils.parseVersionId(this.filesToSearch[currentFileIndex].getName());
this.needUpdatingFilesToSearch = false;
} else { // impossible to find next
this.filesToSearch = null;
@@ -906,18 +907,13 @@ public class WALNode implements IWALNode {
this.needUpdatingFilesToSearch = true;
}
}
+ }
- private boolean filterFilesToSearch(File dir, String name) {
- Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
- Matcher matcher = pattern.matcher(name);
- boolean toSearch = false;
- if (matcher.find()) {
- long versionId = Long.parseLong(matcher.group(IoTDBConstant.WAL_VERSION_ID));
- toSearch = versionId >= searchedFilesVersionId;
- }
- return toSearch;
- }
+ @Override
+ public long getCurrentSearchIndex() {
+ return buffer.getCurrentSearchIndex();
}
+
// endregion
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index e4603b7607..3350894304 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -248,27 +248,27 @@ public class ConsensusReqReaderTest {
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowNode);
Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
Assert.assertEquals(
3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowsNode);
Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertMultiTabletsNode);
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
Assert.assertFalse(iterator.hasNext());
@@ -281,12 +281,12 @@ public class ConsensusReqReaderTest {
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertMultiTabletsNode);
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
@@ -298,7 +298,8 @@ public class ConsensusReqReaderTest {
() -> {
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
- IConsensusRequest req = iterator.next();
+ IConsensusRequest req = iterator.next().getRequest();
+ ;
Assert.assertTrue(req instanceof InsertRowNode);
Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
return true;
@@ -317,7 +318,7 @@ public class ConsensusReqReaderTest {
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
@@ -329,7 +330,8 @@ public class ConsensusReqReaderTest {
() -> {
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
- IConsensusRequest req = iterator.next();
+ IConsensusRequest req = iterator.next().getRequest();
+ ;
Assert.assertTrue(req instanceof InsertRowNode);
Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
return true;
@@ -349,11 +351,11 @@ public class ConsensusReqReaderTest {
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowNode);
Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
Assert.assertEquals(
@@ -362,12 +364,12 @@ public class ConsensusReqReaderTest {
iterator.skipTo(4);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertMultiTabletsNode);
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
Assert.assertFalse(iterator.hasNext());
@@ -380,30 +382,30 @@ public class ConsensusReqReaderTest {
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
iterator.skipTo(2);
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
Assert.assertEquals(
3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertRowsNode);
Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertMultiTabletsNode);
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
Assert.assertTrue(iterator.hasNext());
- request = iterator.next();
+ request = iterator.next().getRequest();
Assert.assertTrue(request instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
Assert.assertFalse(iterator.hasNext());