You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by sp...@apache.org on 2023/02/02 06:07:14 UTC
[iotdb] branch master updated: [IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)
This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c06988797 [IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)
5c06988797 is described below
commit 5c069887972f8332b10df41e2f1634b41d8de338
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Thu Feb 2 14:07:08 2023 +0800
[IOTDB-5257] Optimize SyncLog Logic in IoT Consensus (#8863)
* Refactor syncLog Logic
* Fix the create of statemachine
* Optimize comment
* Fix deserialize
* Fix format
* Fix the implementation of getPlanNode
* Optimize implementation
* move base implementation to BaseStateMachine
* Add deserialize
---
.../statemachine/ConfigNodeRegionStateMachine.java | 20 +-
.../org/apache/iotdb/consensus/IStateMachine.java | 39 ++--
.../DeserializedBatchIndexedConsensusRequest.java | 65 ++++++
.../iotdb/consensus/config/IoTConsensusConfig.java | 15 ++
.../consensus/iot/IoTConsensusServerImpl.java | 122 +++++++++-
.../iot/client/IoTConsensusClientPool.java | 4 +-
.../iot/logdispatcher/IndexController.java | 3 +-
.../consensus/iot/logdispatcher/LogDispatcher.java | 4 +-
.../consensus/iot/logdispatcher/SyncStatus.java | 4 +-
.../service/IoTConsensusRPCServiceProcessor.java | 6 +-
.../ratis/ApplicationStateMachineProxy.java | 4 +-
.../iotdb/consensus/simple/SimpleServerImpl.java | 5 +
.../apache/iotdb/consensus/EmptyStateMachine.java | 5 +
.../iotdb/consensus/iot/util/TestStateMachine.java | 29 ++-
.../apache/iotdb/consensus/ratis/TestUtils.java | 13 +-
.../consensus/simple/SimpleConsensusTest.java | 5 +
.../db/consensus/DataRegionConsensusImpl.java | 20 +-
.../consensus/statemachine/BaseStateMachine.java | 9 +-
.../statemachine/DataRegionStateMachine.java | 245 ++++-----------------
.../IoTConsensusDataRegionStateMachine.java | 111 ++++++++++
.../statemachine/SchemaRegionStateMachine.java | 11 +-
21 files changed, 484 insertions(+), 255 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index db16cb9c83..16d75de30b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -96,28 +97,35 @@ public class ConfigNodeRegionStateMachine
@Override
public TSStatus write(IConsensusRequest request) {
- ConfigPhysicalPlan plan;
+ return Optional.ofNullable(request)
+ .map(o -> write((ConfigPhysicalPlan) request))
+ .orElseGet(() -> new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ IConsensusRequest result;
if (request instanceof ByteBufferConsensusRequest) {
try {
- plan = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
+ result = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
} catch (Throwable e) {
LOGGER.error(
"Deserialization error for write plan, request: {}, bytebuffer: {}",
request,
request.serializeToByteBuffer(),
e);
- return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ return null;
}
} else if (request instanceof ConfigPhysicalPlan) {
- plan = (ConfigPhysicalPlan) request;
+ result = request;
} else {
LOGGER.error(
"Unexpected write plan, request: {}, bytebuffer: {}",
request,
request.serializeToByteBuffer());
- return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ return null;
}
- return write(plan);
+ return result;
}
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 75f50721f9..056c897aff 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -47,18 +47,26 @@ public interface IStateMachine {
}
/**
- * apply a write-request from user
+ * apply a deserialized write-request from user.
*
- * @param IConsensusRequest write request
+ * @param request deserialized request
*/
- TSStatus write(IConsensusRequest IConsensusRequest);
+ TSStatus write(IConsensusRequest request);
/**
- * read local data and return
+ * deserialize IConsensusRequest.
*
- * @param IConsensusRequest read request
+ * @param request write request
+ * @return deserialized request
*/
- DataSet read(IConsensusRequest IConsensusRequest);
+ IConsensusRequest deserializeRequest(IConsensusRequest request);
+
+ /**
+ * read local data and return.
+ *
+ * @param request read request
+ */
+ DataSet read(IConsensusRequest request);
/**
* Take a snapshot of current statemachine. All files are required to be stored under snapshotDir,
@@ -83,7 +91,7 @@ public interface IStateMachine {
}
/**
- * Load the latest snapshot from given dir
+ * Load the latest snapshot from given dir.
*
* @param latestSnapshotRootDir dir where the latest snapshot sits
*/
@@ -110,14 +118,13 @@ public interface IStateMachine {
* retry the operation until it succeed.
*/
interface RetryPolicy {
-
- /** Given the last write result, should we retry? */
+ /** whether we should retry according to the last write result. */
default boolean shouldRetry(TSStatus writeResult) {
return false;
}
/**
- * Use the latest write result to update final write result
+ * Use the latest write result to update final write result.
*
* @param previousResult previous write result
* @param retryResult latest write result
@@ -128,13 +135,13 @@ public interface IStateMachine {
}
/**
- * sleep time before the next retry
+ * sleep time before the next retry.
*
* @return time in millis
*/
default long getSleepTime() {
return 100;
- };
+ }
}
/** An optional API for event notifications. */
@@ -146,7 +153,9 @@ public interface IStateMachine {
* @param groupId The id of this consensus group.
* @param newLeaderId The id of the new leader node.
*/
- default void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {}
+ default void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
+ // do nothing default
+ }
/**
* Notify the {@link IStateMachine} a configuration change. This method will be invoked when a
@@ -156,7 +165,9 @@ public interface IStateMachine {
* @param index index which is being updated
* @param newConfiguration new configuration
*/
- default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {}
+ default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {
+ // do nothing default
+ }
}
/**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
new file mode 100644
index 0000000000..abddbf25b2
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeserializedBatchIndexedConsensusRequest
+ implements IConsensusRequest, Comparable<DeserializedBatchIndexedConsensusRequest> {
+ private final long startSyncIndex;
+ private final long endSyncIndex;
+ private final List<IConsensusRequest> insertNodes;
+
+ public DeserializedBatchIndexedConsensusRequest(
+ long startSyncIndex, long endSyncIndex, int size) {
+ this.startSyncIndex = startSyncIndex;
+ this.endSyncIndex = endSyncIndex;
+ // use arraylist here because we know the number of requests
+ this.insertNodes = new ArrayList<>(size);
+ }
+
+ public long getStartSyncIndex() {
+ return startSyncIndex;
+ }
+
+ public long getEndSyncIndex() {
+ return endSyncIndex;
+ }
+
+ public List<IConsensusRequest> getInsertNodes() {
+ return insertNodes;
+ }
+
+ public void add(IConsensusRequest insertNode) {
+ this.insertNodes.add(insertNode);
+ }
+
+ @Override
+ public int compareTo(DeserializedBatchIndexedConsensusRequest o) {
+ return Long.compare(startSyncIndex, o.startSyncIndex);
+ }
+
+ @Override
+ public ByteBuffer serializeToByteBuffer() {
+ return null;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index ae070852fc..9c676fdaa9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,6 +235,7 @@ public class IoTConsensusConfig {
private final int maxLogEntriesNumPerBatch;
private final int maxSizePerBatch;
private final int maxPendingBatchesNum;
+ private final long maxWaitingTimeForWaitBatchInMs;
private final int maxWaitingTimeForAccumulatingBatchInMs;
private final long basicRetryWaitTimeMs;
private final long maxRetryWaitTimeMs;
@@ -248,6 +249,7 @@ public class IoTConsensusConfig {
int maxLogEntriesNumPerBatch,
int maxSizePerBatch,
int maxPendingBatchesNum,
+ long maxWaitingTimeForWaitBatchInMs,
int maxWaitingTimeForAccumulatingBatchInMs,
long basicRetryWaitTimeMs,
long maxRetryWaitTimeMs,
@@ -259,6 +261,7 @@ public class IoTConsensusConfig {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatchesNum = maxPendingBatchesNum;
+ this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
@@ -281,6 +284,10 @@ public class IoTConsensusConfig {
return maxPendingBatchesNum;
}
+ public long getMaxWaitingTimeForWaitBatchInMs() {
+ return maxWaitingTimeForWaitBatchInMs;
+ }
+
public int getMaxWaitingTimeForAccumulatingBatchInMs() {
return maxWaitingTimeForAccumulatingBatchInMs;
}
@@ -324,6 +331,7 @@ public class IoTConsensusConfig {
// (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
// in DataRegionStateMachine
private int maxPendingBatchesNum = 5;
+ private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
@@ -348,6 +356,12 @@ public class IoTConsensusConfig {
return this;
}
+ public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
+ long maxWaitingTimeForWaitBatchInMs) {
+ this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
+ return this;
+ }
+
public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
int maxWaitingTimeForAccumulatingBatchInMs) {
this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
@@ -394,6 +408,7 @@ public class IoTConsensusConfig {
maxLogEntriesNumPerBatch,
maxSizePerBatch,
maxPendingBatchesNum,
+ maxWaitingTimeForWaitBatchInMs,
maxWaitingTimeForAccumulatingBatchInMs,
basicRetryWaitTimeMs,
maxRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index f8a4316db9..e24478179a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
@@ -75,7 +76,10 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -93,6 +97,7 @@ public class IoTConsensusServerImpl {
private final Peer thisNode;
private final IStateMachine stateMachine;
+ private final ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
private final Lock stateMachineLock = new ReentrantLock();
private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
@@ -121,6 +126,7 @@ public class IoTConsensusServerImpl {
this.storageDir = storageDir;
this.thisNode = thisNode;
this.stateMachine = stateMachine;
+ this.cacheQueueMap = new ConcurrentHashMap<>();
this.syncClientManager = syncClientManager;
this.configuration = configuration;
if (configuration.isEmpty()) {
@@ -155,7 +161,8 @@ public class IoTConsensusServerImpl {
}
/**
- * records the index of the log and writes locally, and then asynchronous replication is performed
+ * records the index of the log and writes locally, and then asynchronous replication is
+ * performed.
*/
public TSStatus write(IConsensusRequest request) {
long consensusWriteStartTime = System.currentTimeMillis();
@@ -216,7 +223,8 @@ public class IoTConsensusServerImpl {
indexedConsensusRequest.getSearchIndex());
}
// TODO wal and memtable
- TSStatus result = stateMachine.write(indexedConsensusRequest);
+ IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
+ TSStatus result = stateMachine.write(planNode);
long writeToStateMachineEndTime = System.currentTimeMillis();
// statistic the time of writing request into stateMachine
MetricService.getInstance()
@@ -491,7 +499,8 @@ public class IoTConsensusServerImpl {
// after current operation
// TODO: (xingtanzjr) design more reliable way for IoTConsensus
logger.error(
- "cannot notify {} to build sync log channel. Please check the status of this node manually",
+ "cannot notify {} to build sync log channel. "
+ + "Please check the status of this node manually",
peer,
e);
}
@@ -800,4 +809,111 @@ public class IoTConsensusServerImpl {
reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
}
}
+
+ public TSStatus syncLog(String sourcePeerId, IConsensusRequest request) {
+ return cacheQueueMap
+ .computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
+ .cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
+ }
+
+ /**
+ * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
+ * in follower the same as the leader. And besides order insurance, we can make the
+ * deserialization of PlanNode to be concurrent
+ */
+ private class SyncLogCacheQueue {
+ private final String sourcePeerId;
+ private final Lock queueLock = new ReentrantLock();
+ private final Condition queueSortCondition = queueLock.newCondition();
+ private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
+ private long nextSyncIndex = -1;
+
+ public SyncLogCacheQueue(String sourcePeerId) {
+ this.sourcePeerId = sourcePeerId;
+ this.requestCache = new PriorityQueue<>();
+ }
+
+ /**
+ * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
+ * order in follower the same as the leader. And besides order insurance, we can make the
+ * deserialization of PlanNode to be concurrent
+ */
+ private TSStatus cacheAndInsertLatestNode(DeserializedBatchIndexedConsensusRequest request) {
+ queueLock.lock();
+ try {
+ requestCache.add(request);
+ // If the peek is not hold by current thread, it should notify the corresponding thread to
+ // process the peek when the queue is full
+ if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
+ && requestCache.peek() != null
+ && requestCache.peek().getStartSyncIndex() != request.getStartSyncIndex()) {
+ queueSortCondition.signalAll();
+ }
+ while (true) {
+ // If current InsertNode is the next target InsertNode, write it
+ if (request.getStartSyncIndex() == nextSyncIndex) {
+ requestCache.remove(request);
+ nextSyncIndex = request.getEndSyncIndex() + 1;
+ break;
+ }
+ // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+ // the peek request. This is used to keep the whole write correct when nextSyncIndex
+ // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+ // There are some cases that nextSyncIndex is not set:
+ // 1. When the system was just started
+ // 2. When some exception occurs during SyncLog
+ if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
+ && requestCache.peek() != null
+ && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
+ requestCache.remove();
+ nextSyncIndex = request.getEndSyncIndex() + 1;
+ break;
+ }
+ try {
+ boolean timeout =
+ !queueSortCondition.await(
+ config.getReplication().getMaxWaitingTimeForWaitBatchInMs(),
+ TimeUnit.MILLISECONDS);
+ if (timeout) {
+ // although the timeout is triggered, current thread cannot write its request
+ // if current thread does not hold the peek request. And there should be some
+ // other thread who hold the peek request. In this scenario, current thread
+ // should go into await again and wait until its request becoming peek request
+ if (requestCache.peek() != null
+ && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
+ // current thread hold the peek request thus it can write the peek immediately.
+ logger.info(
+ "waiting target request timeout. current index: {}, target index: {}",
+ request.getStartSyncIndex(),
+ nextSyncIndex);
+ requestCache.remove(request);
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.warn(
+ "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+ request.getStartSyncIndex(),
+ e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ logger.debug(
+ "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+ sourcePeerId,
+ consensusGroupId,
+ requestCache.size(),
+ request.getStartSyncIndex(),
+ request.getEndSyncIndex());
+ List<TSStatus> subStatus = new LinkedList<>();
+ for (IConsensusRequest insertNode : request.getInsertNodes()) {
+ subStatus.add(stateMachine.write(insertNode));
+ }
+ queueSortCondition.signalAll();
+ return new TSStatus().setSubStatus(subStatus);
+ } finally {
+ queueLock.unlock();
+ }
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 90e5aed12c..92b2cc28cf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -32,7 +32,9 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class IoTConsensusClientPool {
- private IoTConsensusClientPool() {}
+ private IoTConsensusClientPool() {
+ // empty constructor
+ }
public static class SyncIoTConsensusServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, SyncIoTConsensusServiceClient> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 7630849928..116a5f3c96 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -126,7 +126,8 @@ public class IndexController {
// because it won't infect the correctness
logger.info(
"failed to flush sync index because previous version file {} does not exists. "
- + "It may be caused by the target Peer is removed from current group. target file is {}",
+ + "It may be caused by the target Peer is removed from current group. "
+ + "target file is {}",
oldFile.getAbsolutePath(),
newFile.getAbsolutePath());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 93de711b76..883952275a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -234,7 +234,7 @@ public class LogDispatcher {
return bufferedEntries.size();
}
- /** try to offer a request into queue with memory control */
+ /** try to offer a request into queue with memory control. */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
return false;
@@ -254,7 +254,7 @@ public class LogDispatcher {
return success;
}
- /** try to remove a request from queue with memory control */
+ /** try to remove a request from queue with memory control. */
private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 891961d70b..3f0c1477d0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -39,7 +39,7 @@ public class SyncStatus {
this.config = config;
}
- /** we may block here if the synchronization pipeline is full */
+ /** we may block here if the synchronization pipeline is full. */
public void addNextBatch(Batch batch) throws InterruptedException {
synchronized (this) {
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
@@ -86,7 +86,7 @@ public class SyncStatus {
iotConsensusMemoryManager.free(size);
}
- /** Gets the first index that is not currently synchronized */
+ /** Gets the first index that is not currently synchronized. */
public long getNextSendingIndex() {
// we do not use ReentrantReadWriteLock because there will be only one thread reading this field
synchronized (this) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 6b24f596a2..0921d0a962 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.iot.IoTConsensus;
@@ -114,7 +115,10 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy
: ByteBufferConsensusRequest::new)
.collect(Collectors.toList())));
}
- TSStatus writeStatus = impl.getStateMachine().write(logEntriesInThisBatch);
+ IConsensusRequest deserializedRequest =
+ impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
+ TSStatus writeStatus =
+ impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest);
logger.debug(
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index c262996089..10acca5386 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -134,7 +134,9 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
if (!firstTry) {
Thread.sleep(retryPolicy.getSleepTime());
}
- TSStatus result = applicationStateMachine.write(applicationRequest);
+ IConsensusRequest deserializedRequest =
+ applicationStateMachine.deserializeRequest(applicationRequest);
+ TSStatus result = applicationStateMachine.write(deserializedRequest);
if (firstTry) {
finalStatus = result;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
index bb3ef2f557..3f2b68857c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
@@ -67,6 +67,11 @@ public class SimpleServerImpl implements IStateMachine {
return stateMachine.write(request);
}
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ return request;
+ }
+
@Override
public DataSet read(IConsensusRequest request) {
return stateMachine.read(request);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 01992f472f..4580a865c2 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -38,6 +38,11 @@ public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi
return new TSStatus(0);
}
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ return request;
+ }
+
@Override
public DataSet read(IConsensusRequest IConsensusRequest) {
return null;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
index 91e614bb72..2aa22b1616 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -76,12 +77,12 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
if (request instanceof IndexedConsensusRequest) {
writeOneRequest((IndexedConsensusRequest) request);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } else if (request instanceof BatchIndexedConsensusRequest) {
- BatchIndexedConsensusRequest batchIndexedConsensusRequest =
- (BatchIndexedConsensusRequest) request;
+ } else if (request instanceof DeserializedBatchIndexedConsensusRequest) {
+ DeserializedBatchIndexedConsensusRequest batchIndexedConsensusRequest =
+ (DeserializedBatchIndexedConsensusRequest) request;
List<TSStatus> subStatus = new ArrayList<>();
- for (IndexedConsensusRequest innerRequest : batchIndexedConsensusRequest.getRequests()) {
- writeOneRequest(innerRequest);
+ for (IConsensusRequest innerRequest : batchIndexedConsensusRequest.getInsertNodes()) {
+ writeOneRequest((IndexedConsensusRequest) innerRequest);
subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
return new TSStatus().setSubStatus(subStatus);
@@ -92,6 +93,24 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
}
}
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ if (request instanceof BatchIndexedConsensusRequest) {
+ BatchIndexedConsensusRequest consensusRequest = (BatchIndexedConsensusRequest) request;
+ DeserializedBatchIndexedConsensusRequest result =
+ new DeserializedBatchIndexedConsensusRequest(
+ consensusRequest.getStartSyncIndex(),
+ consensusRequest.getEndSyncIndex(),
+ consensusRequest.getRequests().size());
+ for (IndexedConsensusRequest r : consensusRequest.getRequests()) {
+ result.add(r);
+ }
+ return result;
+ } else {
+ return request;
+ }
+ }
+
private void writeOneRequest(IndexedConsensusRequest indexedConsensusRequest) {
List<IConsensusRequest> transformedRequest = new ArrayList<>();
for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 0c66c38330..03682b0312 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -89,16 +89,21 @@ public class TestUtils {
@Override
public TSStatus write(IConsensusRequest request) {
+ if (((TestRequest) request).isIncr()) {
+ integer.incrementAndGet();
+ }
+ return new TSStatus(200);
+ }
+
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
TestRequest testRequest;
if (request instanceof ByteBufferConsensusRequest) {
testRequest = new TestRequest(request.serializeToByteBuffer());
} else {
testRequest = (TestRequest) request;
}
- if (testRequest.isIncr()) {
- integer.incrementAndGet();
- }
- return new TSStatus(200);
+ return testRequest;
}
@Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 8bea3db3d5..558c8bdf4f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -108,6 +108,11 @@ public class SimpleConsensusTest {
return new TSStatus();
}
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ return request;
+ }
+
@Override
public DataSet read(IConsensusRequest request) {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 104fbc087f..905a8275c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.consensus;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
@@ -31,7 +32,9 @@ import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -48,7 +51,9 @@ public class DataRegionConsensusImpl {
private static IConsensus INSTANCE = null;
- private DataRegionConsensusImpl() {}
+ private DataRegionConsensusImpl() {
+ // do nothing
+ }
// need to create instance before calling this method
public static IConsensus getInstance() {
@@ -171,9 +176,7 @@ public class DataRegionConsensusImpl {
.build())
.build())
.build(),
- gid ->
- new DataRegionStateMachine(
- StorageEngine.getInstance().getDataRegion((DataRegionId) gid)))
+ DataRegionConsensusImpl::createDataRegionStateMachine)
.orElseThrow(
() ->
new IllegalArgumentException(
@@ -183,4 +186,13 @@ public class DataRegionConsensusImpl {
}
return INSTANCE;
}
+
+ private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
+ DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
+ if (ConsensusFactory.IOT_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass())) {
+ return new IoTConsensusDataRegionStateMachine(dataRegion);
+ } else {
+ return new DataRegionStateMachine(dataRegion);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index f36429e702..5c9b4016ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,11 +22,9 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +47,15 @@ public abstract class BaseStateMachine
return instance;
}
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ return getPlanNode(request);
+ }
+
protected PlanNode getPlanNode(IConsensusRequest request) {
PlanNode node;
if (request instanceof ByteBufferConsensusRequest) {
node = PlanNodeType.deserialize(request.serializeToByteBuffer());
- } else if (request instanceof IoTConsensusRequest) {
- node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
} else if (request instanceof PlanNode) {
node = (PlanNode) request;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c2c8bd0b7e..f7a47cbc37 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
@@ -55,14 +54,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -72,24 +64,21 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER =
FragmentInstanceManager.getInstance();
- private DataRegion region;
-
- private static final int MAX_REQUEST_CACHE_SIZE = 5;
- private static final long CACHE_WINDOW_TIME_IN_MS =
- IoTDBDescriptor.getInstance().getConfig().getCacheWindowTimeInMs();
-
- private ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
+ protected DataRegion region;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.cacheQueueMap = new ConcurrentHashMap<>();
}
@Override
- public void start() {}
+ public void start() {
+ // do nothing
+ }
@Override
- public void stop() {}
+ public void stop() {
+ // do nothing
+ }
@Override
public boolean isReadOnly() {
@@ -151,146 +140,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
- /**
- * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
- * in follower the same as the leader. And besides order insurance, we can make the
- * deserialization of PlanNode to be concurrent
- */
- private class SyncLogCacheQueue {
- private final String sourcePeerId;
- private final Lock queueLock = new ReentrantLock();
- private final Condition queueSortCondition = queueLock.newCondition();
- private final PriorityQueue<InsertNodeWrapper> requestCache;
- private long nextSyncIndex = -1;
-
- public SyncLogCacheQueue(String sourcePeerId, int queueSize, long timeout) {
- this.sourcePeerId = sourcePeerId;
- this.requestCache = new PriorityQueue<>();
- }
-
- /**
- * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
- * order in follower the same as the leader. And besides order insurance, we can make the
- * deserialization of PlanNode to be concurrent
- */
- private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
- queueLock.lock();
- try {
- requestCache.add(insertNodeWrapper);
- // If the peek is not hold by current thread, it should notify the corresponding thread to
- // process the peek when the queue is full
- if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
- queueSortCondition.signalAll();
- }
- while (true) {
- // If current InsertNode is the next target InsertNode, write it
- if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
- requestCache.remove(insertNodeWrapper);
- nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
- break;
- }
- // If all write thread doesn't hit nextSyncIndex and the heap is full, write
- // the peek request. This is used to keep the whole write correct when nextSyncIndex
- // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
- // There are some cases that nextSyncIndex is not set:
- // 1. When the system was just started
- // 2. When some exception occurs during SyncLog
- if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
- requestCache.remove();
- nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
- break;
- }
- try {
- boolean timeout =
- !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
- if (timeout) {
- // although the timeout is triggered, current thread cannot write its request
- // if current thread does not hold the peek request. And there should be some
- // other thread who hold the peek request. In this scenario, current thread
- // should go into await again and wait until its request becoming peek request
- if (requestCache.peek().getStartSyncIndex()
- == insertNodeWrapper.getStartSyncIndex()) {
- // current thread hold the peek request thus it can write the peek immediately.
- logger.info(
- "waiting target request timeout. current index: {}, target index: {}",
- insertNodeWrapper.getStartSyncIndex(),
- nextSyncIndex);
- requestCache.remove(insertNodeWrapper);
- break;
- }
- }
- } catch (InterruptedException e) {
- logger.warn(
- "current waiting is interrupted. SyncIndex: {}. Exception: {}",
- insertNodeWrapper.getStartSyncIndex(),
- e);
- Thread.currentThread().interrupt();
- }
- }
- logger.debug(
- "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
- sourcePeerId,
- region.getDataRegionId(),
- requestCache.size(),
- insertNodeWrapper.getStartSyncIndex(),
- insertNodeWrapper.getEndSyncIndex());
- List<TSStatus> subStatus = new LinkedList<>();
- for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
- subStatus.add(write(planNode));
- }
- queueSortCondition.signalAll();
- return new TSStatus().setSubStatus(subStatus);
- } finally {
- queueLock.unlock();
- }
- }
- }
-
- private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
- private final long startSyncIndex;
- private final long endSyncIndex;
- private final List<PlanNode> insertNodes;
-
- public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
- this.startSyncIndex = startSyncIndex;
- this.endSyncIndex = endSyncIndex;
- this.insertNodes = new LinkedList<>();
- }
-
- @Override
- public int compareTo(InsertNodeWrapper o) {
- return Long.compare(startSyncIndex, o.startSyncIndex);
- }
-
- public void add(PlanNode insertNode) {
- this.insertNodes.add(insertNode);
- }
-
- public long getStartSyncIndex() {
- return startSyncIndex;
- }
-
- public long getEndSyncIndex() {
- return endSyncIndex;
- }
-
- public List<PlanNode> getInsertNodes() {
- return insertNodes;
- }
- }
-
- private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
- InsertNodeWrapper insertNodeWrapper =
- new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
- for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
- insertNodeWrapper.add(grabInsertNode(indexedRequest));
- }
- return insertNodeWrapper;
- }
-
- private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
@@ -305,63 +155,22 @@ public class DataRegionStateMachine extends BaseStateMachine {
return planNode;
} else {
throw new IllegalArgumentException(
- "PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
+ "PlanNodes in IndexedConsensusRequest are not InsertNode and "
+ + "the size of requests are larger than 1");
}
}
return mergeInsertNodes(insertNodes);
}
- @Override
- public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
- try {
- return new SnapshotLoader(
- latestSnapshotRootDir.getAbsolutePath(),
- region.getDatabaseName(),
- region.getDataRegionId())
- .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
- } catch (IOException e) {
- logger.error(
- "Meets error when getting snapshot files for {}-{}",
- region.getDatabaseName(),
- region.getDataRegionId(),
- e);
- return null;
- }
- }
-
- @Override
- public TSStatus write(IConsensusRequest request) {
- PlanNode planNode;
- try {
- if (request instanceof IndexedConsensusRequest) {
- IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
- planNode = grabInsertNode(indexedRequest);
- } else if (request instanceof BatchIndexedConsensusRequest) {
- InsertNodeWrapper insertNodeWrapper =
- deserializeAndWrap((BatchIndexedConsensusRequest) request);
- String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();
- return cacheQueueMap
- .computeIfAbsent(
- sourcePeerId,
- k -> new SyncLogCacheQueue(k, MAX_REQUEST_CACHE_SIZE, CACHE_WINDOW_TIME_IN_MS))
- .cacheAndInsertLatestNode(insertNodeWrapper);
- } else {
- planNode = getPlanNode(request);
- }
- return write(planNode);
- } catch (IllegalArgumentException e) {
- logger.error(e.getMessage(), e);
- return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- }
- }
-
/**
* Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
* merged to one multi-tablet). <br>
* Notice: the continuity of insert nodes sharing same search index should be protected by the
* upper layer.
+ *
+ * @exception RuntimeException when insertNodes is empty
*/
- private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+ protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
throw new RuntimeException();
@@ -407,6 +216,34 @@ public class DataRegionStateMachine extends BaseStateMachine {
return result;
}
+ @Override
+ public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
+ try {
+ return new SnapshotLoader(
+ latestSnapshotRootDir.getAbsolutePath(),
+ region.getDatabaseName(),
+ region.getDataRegionId())
+ .getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
+ } catch (IOException e) {
+ logger.error(
+ "Meets error when getting snapshot files for {}-{}",
+ region.getDatabaseName(),
+ region.getDataRegionId(),
+ e);
+ return null;
+ }
+ }
+
+ @Override
+ public TSStatus write(IConsensusRequest request) {
+ try {
+ return write((PlanNode) request);
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(), e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
protected TSStatus write(PlanNode planNode) {
return planNode.accept(new DataExecutionVisitor(), region);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
new file mode 100644
index 0000000000..26d52eed22
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class IoTConsensusDataRegionStateMachine extends DataRegionStateMachine {
+
+ public Logger logger = LoggerFactory.getLogger(IoTConsensusDataRegionStateMachine.class);
+
+ public IoTConsensusDataRegionStateMachine(DataRegion region) {
+ super(region);
+ }
+
+ @Override
+ public TSStatus write(IConsensusRequest request) {
+ try {
+ if (request instanceof DeserializedBatchIndexedConsensusRequest) {
+ List<TSStatus> subStatus = new LinkedList<>();
+ for (IConsensusRequest consensusRequest :
+ ((DeserializedBatchIndexedConsensusRequest) request).getInsertNodes()) {
+ subStatus.add(write((PlanNode) consensusRequest));
+ }
+ return new TSStatus().setSubStatus(subStatus);
+ } else {
+ return write((PlanNode) request);
+ }
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(), e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ /**
+ * Deserialize request to PlanNode or BatchedIndexedRequest
+ *
+ * @param request write request
+ */
+ @Override
+ public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ IConsensusRequest result;
+ if (request instanceof IndexedConsensusRequest) {
+ IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
+ result = grabInsertNode(indexedRequest);
+ } else if (request instanceof BatchIndexedConsensusRequest) {
+ BatchIndexedConsensusRequest batchRequest = (BatchIndexedConsensusRequest) request;
+ DeserializedBatchIndexedConsensusRequest deserializedRequest =
+ new DeserializedBatchIndexedConsensusRequest(
+ batchRequest.getStartSyncIndex(),
+ batchRequest.getEndSyncIndex(),
+ batchRequest.getRequests().size());
+ for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+ deserializedRequest.add(grabInsertNode(indexedRequest));
+ }
+ result = deserializedRequest;
+ } else {
+ result = getPlanNode(request);
+ }
+ return result;
+ }
+
+ @Override
+ protected PlanNode getPlanNode(IConsensusRequest request) {
+ PlanNode node;
+ if (request instanceof ByteBufferConsensusRequest) {
+ node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+ } else if (request instanceof IoTConsensusRequest) {
+ node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
+ } else if (request instanceof PlanNode) {
+ node = (PlanNode) request;
+ } else {
+ logger.error("Unexpected IConsensusRequest : {}", request);
+ throw new IllegalArgumentException("Unexpected IConsensusRequest!");
+ }
+ return node;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index c324b35d07..9b20fbb13f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -47,10 +48,14 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
}
@Override
- public void start() {}
+ public void start() {
+ // do nothing
+ }
@Override
- public void stop() {}
+ public void stop() {
+ // do nothing
+ }
@Override
public boolean isReadOnly() {
@@ -70,7 +75,7 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
@Override
public TSStatus write(IConsensusRequest request) {
try {
- return getPlanNode(request).accept(new SchemaExecutionVisitor(), schemaRegion);
+ return ((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());