You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/01 10:15:26 UTC
[iotdb] 01/01: change to Async Write exp
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 336e861b18738f289ce7197274687452b259fe18
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 1 18:15:08 2022 +0800
change to Async Write exp
---
.../org/apache/iotdb/consensus/IStateMachine.java | 7 ++
.../service/MultiLeaderRPCServiceProcessor.java | 23 ++---
.../consensus/statemachine/BaseStateMachine.java | 12 +++
.../statemachine/DataRegionStateMachine.java | 115 +++++++++++----------
.../plan/planner/plan/node/write/InsertNode.java | 10 --
5 files changed, 88 insertions(+), 79 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 203b7d2a6b..3b5e6b1796 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,6 +25,10 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+
+import org.apache.thrift.async.AsyncMethodCallback;
import javax.annotation.concurrent.ThreadSafe;
@@ -41,6 +45,9 @@ public interface IStateMachine {
void stop();
+ void multiLeaderWriteAsync(
+ List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
+
/**
* apply a write-request from user
*
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b24be2a1e6..157b8b0207 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -67,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
return;
}
- List<TSStatus> statuses = new ArrayList<>();
+ List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -79,11 +81,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
: new ByteBufferConsensusRequest(batch.data);
// merge TLogBatch with same search index into one request
if (batch.getSearchIndex() != currentSearchIndex) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests)));
+ indexedConsensusRequests.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
consensusRequests = new ArrayList<>();
currentSearchIndex = batch.getSearchIndex();
}
@@ -91,15 +91,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
}
// write last request
if (!consensusRequests.isEmpty()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests)));
+ indexedConsensusRequests.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
}
+ impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
}
- logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
} catch (Exception e) {
resultHandler.onError(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 32333f26e9..245a44adc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,19 +22,31 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.NotSupportedException;
+
+import java.util.List;
+
public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
+ public void multiLeaderWriteAsync(
+ List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+ throw new NotSupportedException();
+ }
+
protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
FragmentInstance instance;
if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0fd9dc5a76..32847e2eb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,13 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
@@ -65,11 +66,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
- private final PriorityQueue<InsertNode> requestCache;
+ private final PriorityQueue<InsertNodeWrapper> requestCache;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
+ this.requestCache = new PriorityQueue<>();
}
@Override
@@ -114,56 +115,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
- private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
- insertNode.setSyncIndex(syncIndex);
+ private InsertNodeWrapper cacheAndGetLatestInsertNode(
+ long syncIndex,
+ List<InsertNode> insertNodes,
+ AsyncMethodCallback<TSyncLogRes> resultHandler) {
synchronized (requestCache) {
- requestCache.add(insertNode);
- requestCache.notifyAll();
- while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getSyncIndex() == syncIndex)) {
- try {
- requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+ if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
+ return requestCache.poll();
}
- requestCache.notifyAll();
- InsertNode nextNode = requestCache.poll();
- // logger.info("queue size {}, syncIndex = {}", requestCache.size(),
- // nextNode.getSyncIndex());
- return nextNode;
+ return null;
}
}
- // private InsertNode cacheAndGetLatestInsertNode2(long syncIndex, InsertNode insertNode) {
- // insertNode.setSyncIndex(syncIndex);
- // synchronized (requestCache) {
- // requestCache.add(insertNode);
- // if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
- // if (insertNode == requestCache.peek()) {
- // return requestCache.poll();
- // } else {
- // requestCache.poll().notify();
- // }
- // }
- // }
- //
- // synchronized (requestCache) {
- // try {
- // insertNode.wait(CACHE_WINDOW_TIME_IN_MS);
- // } catch (InterruptedException e) {
- // Thread.currentThread().interrupt();
- // }
- // }
- // }
-
private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
private final long syncIndex;
- private final InsertNode insertNode;
+ private final List<InsertNode> insertNodes;
+ private final AsyncMethodCallback<TSyncLogRes> resultHandler;
- public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+ public InsertNodeWrapper(
+ long syncIndex,
+ List<InsertNode> insertNode,
+ AsyncMethodCallback<TSyncLogRes> resultHandler) {
this.syncIndex = syncIndex;
- this.insertNode = insertNode;
+ this.insertNodes = insertNode;
+ this.resultHandler = resultHandler;
}
@Override
@@ -175,8 +151,43 @@ public class DataRegionStateMachine extends BaseStateMachine {
return syncIndex;
}
- public InsertNode getInsertNode() {
- return insertNode;
+ public List<InsertNode> getInsertNodes() {
+ return insertNodes;
+ }
+
+ public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
+ return resultHandler;
+ }
+ }
+
+ public void multiLeaderWriteAsync(
+ List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+ List<TSStatus> statuses = new LinkedList<>();
+ try {
+ List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
+ for (IndexedConsensusRequest indexedRequest : requests) {
+ List<InsertNode> insertNodesInOneRequest =
+ new ArrayList<>(indexedRequest.getRequests().size());
+ for (IConsensusRequest req : indexedRequest.getRequests()) {
+ // PlanNode in IndexedConsensusRequest should always be InsertNode
+ InsertNode innerNode = (InsertNode) getPlanNode(req);
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodesInOneRequest.add(innerNode);
+ }
+ insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
+ }
+ InsertNodeWrapper insertNodeWrapper =
+ cacheAndGetLatestInsertNode(
+ requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
+ if (insertNodeWrapper != null) {
+ for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+ statuses.add(write(insertNode));
+ }
+ insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
+ }
+ } catch (IllegalArgumentException e) {
+ logger.error(e.getMessage(), e);
+ resultHandler.onError(e);
}
}
@@ -193,15 +204,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
}
- if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
- long cacheRequestStartTime = System.nanoTime();
- planNode =
- cacheAndGetLatestInsertNode(
- indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
- StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
- } else {
- planNode = mergeInsertNodes(insertNodes);
- }
+ planNode = mergeInsertNodes(insertNodes);
} else {
planNode = getPlanNode(request);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 787dfbf9ad..f5b5bddb52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,8 +77,6 @@ public abstract class InsertNode extends WritePlanNode {
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- protected long syncIndex = NO_CONSENSUS_INDEX;
-
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -155,14 +153,6 @@ public abstract class InsertNode extends WritePlanNode {
return searchIndex;
}
- public void setSyncIndex(long syncIndex) {
- this.syncIndex = syncIndex;
- }
-
- public long getSyncIndex() {
- return syncIndex;
- }
-
/** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;