You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 08:31:00 UTC
[iotdb] branch ml_0808_test_exp1_parallel updated: refine codes and add more comments
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
new e6d2cf0bc9 refine codes and add more comments
e6d2cf0bc9 is described below
commit e6d2cf0bc9d1156d934638b3e8b4e310e85892b9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 16:30:47 2022 +0800
refine codes and add more comments
---
.../iotdb/consensus/config/MultiLeaderConfig.java | 4 +-
.../multileader/MultiLeaderServerImpl.java | 7 ++-
.../multileader/logdispatcher/LogDispatcher.java | 14 ++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../consensus/statemachine/BaseStateMachine.java | 28 ++++------
.../statemachine/DataRegionStateMachine.java | 23 ++++----
.../db/mpp/plan/execution/QueryExecution.java | 61 ++++++++++++----------
.../distribution/WriteFragmentParallelPlanner.java | 14 -----
.../plan/planner/plan/node/write/InsertNode.java | 10 ----
9 files changed, 70 insertions(+), 93 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 0355485847..9b4a72a4f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -132,7 +132,7 @@ public class MultiLeaderConfig {
public static class Builder {
private int rpcSelectorThreadNum = 1;
- private int rpcMinConcurrentClientNum = 16;
+ private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
private int rpcMaxConcurrentClientNum = 65535;
private int thriftServerAwaitTimeForStopService = 60;
private boolean isRpcThriftCompressionEnabled = false;
@@ -249,6 +249,8 @@ public class MultiLeaderConfig {
public static class Builder {
private int maxPendingRequestNumPerNode = 600;
private int maxRequestPerBatch = 30;
+ // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
+ // in DataRegionStateMachine
private int maxPendingBatch = 5;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 096cc160a0..b0cfba2718 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -121,8 +121,13 @@ public class MultiLeaderServerImpl {
}
// TODO wal and memtable
TSStatus result = stateMachine.write(indexedConsensusRequest);
- long offerStartTime = System.nanoTime();
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // The index is used when constructing batch in LogDispatcher. If its value
+ // increases but the corresponding request does not exist or is not put into
+ // the queue, the dispatcher will try to find the request in WAL. This behavior
+ // is not expected and will slow down the preparation speed for batch.
+ // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
+ // in one transaction.
synchronized (index) {
logDispatcher.offer(indexedConsensusRequest);
index.incrementAndGet();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 5f29665346..8938d8d03f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -145,7 +145,6 @@ public class LogDispatcher {
private ConsensusReqReader.ReqIterator walEntryiterator;
private long iteratorIndex = 1;
- private int queueCount = 0;
public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
this.peer = peer;
@@ -244,7 +243,6 @@ public class LogDispatcher {
bufferedRequest,
config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
- // impl.getIndexObject().notifyAll();
}
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
@@ -260,7 +258,8 @@ public class LogDispatcher {
// This condition will be executed in several scenarios:
// 1. restart
// 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
- // up.
+ // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
+ // `maxIndexWhenBufferedRequestEmpty`
if (bufferedRequest.isEmpty()) {
endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
@@ -270,8 +269,7 @@ public class LogDispatcher {
// Notice that prev searchIndex >= startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
- // Prevents gap between logs. For example, some requests are not written into the queue
- // when
+ // Prevents gap between logs. For example, some requests are not written into the queue when
// the queue is full. In this case, requests need to be loaded from the WAL
endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
@@ -353,11 +351,6 @@ public class LogDispatcher {
&& logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
try {
- logger.debug(
- "{} : before wait pendingRequest Size: {}, bufferedRequest size: {}",
- impl.getThisNode().getGroupId(),
- pendingRequest.size(),
- bufferedRequest.size());
walEntryiterator.waitForNextReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -368,7 +361,6 @@ public class LogDispatcher {
iteratorIndex = currentIndex;
for (IConsensusRequest innerRequest : data.getRequests()) {
logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), currentIndex, true));
- logger.info("read one entry from WAL for dispatching: {}", data.getSearchIndex());
}
if (currentIndex == maxIndex - 1) {
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6d54339800..cb2583e4d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -118,7 +118,7 @@ public class IoTDBConfig {
private int rpcSelectorThreadNum = 1;
/** Min concurrent client number */
- private int rpcMinConcurrentClientNum = 16;
+ private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 65535;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 3bd74bd91a..32333f26e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.consensus.statemachine;
-import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -50,22 +49,17 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
}
protected PlanNode getPlanNode(IConsensusRequest request) {
- long startTime = System.nanoTime();
- try {
- PlanNode node;
- if (request instanceof ByteBufferConsensusRequest) {
- node = PlanNodeType.deserialize(request.serializeToByteBuffer());
- } else if (request instanceof MultiLeaderConsensusRequest) {
- node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
- } else if (request instanceof PlanNode) {
- node = (PlanNode) request;
- } else {
- logger.error("Unexpected IConsensusRequest : {}", request);
- throw new IllegalArgumentException("Unexpected IConsensusRequest!");
- }
- return node;
- } finally {
- StepTracker.trace("deserializePlanNode", startTime, System.nanoTime());
+ PlanNode node;
+ if (request instanceof ByteBufferConsensusRequest) {
+ node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+ } else if (request instanceof MultiLeaderConsensusRequest) {
+ node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
+ } else if (request instanceof PlanNode) {
+ node = (PlanNode) request;
+ } else {
+ logger.error("Unexpected IConsensusRequest : {}", request);
+ throw new IllegalArgumentException("Unexpected IConsensusRequest!");
}
+ return node;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4b7a416740..1af648d335 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
@@ -44,7 +43,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +120,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
+ /**
+ * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+ * in follower the same as the leader. And besides order insurance, we can make the
+ * deserialization of PlanNode to be concurrent
+ */
private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
queueLock.lock();
try {
@@ -133,11 +136,18 @@ public class DataRegionStateMachine extends BaseStateMachine {
queueSortCondition.signalAll();
}
while (true) {
+ // If current InsertNode is the next target InsertNode, write it
if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
requestCache.remove(insertNodeWrapper);
nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
break;
}
+ // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+ // the peek request. This is used to keep the whole write correct when nextSyncIndex
+ // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+ // There are some cases that nextSyncIndex is not set:
+ // 1. When the system was just started
+ // 2. When some exception occurs during SyncLog
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
requestCache.remove();
@@ -199,7 +209,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
@Override
- public int compareTo(@NotNull InsertNodeWrapper o) {
+ public int compareTo(InsertNodeWrapper o) {
return Long.compare(startSyncIndex, o.startSyncIndex);
}
@@ -314,12 +324,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
protected TSStatus write(PlanNode planNode) {
- long startTime = System.nanoTime();
- try {
- return planNode.accept(new DataExecutionVisitor(), region);
- } finally {
- StepTracker.trace("StateMachineWrite", startTime, System.nanoTime());
- }
+ return planNode.accept(new DataExecutionVisitor(), region);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7c2bc8c9bd..842352a7b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -52,6 +53,9 @@ import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -63,6 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -462,35 +467,33 @@ public class QueryExecution implements IQueryExecution {
}
// collect redirect info to client for writing
- // if (analysis.getStatement() instanceof InsertBaseStatement) {
- // InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
- // List<TEndPoint> redirectNodeList;
- // if (config.isClusterMode()) {
- // redirectNodeList =
- // insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
- // } else {
- // redirectNodeList = Collections.emptyList();
- // }
- // if (insertStatement instanceof InsertRowsStatement
- // || insertStatement instanceof InsertMultiTabletsStatement) {
- // // multiple devices
- // if (statusCode == TSStatusCode.SUCCESS_STATUS) {
- // List<TSStatus> subStatus = new ArrayList<>();
- // tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- // for (TEndPoint endPoint : redirectNodeList) {
- // subStatus.add(
- //
- // StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
- // }
- // tsstatus.setSubStatus(subStatus);
- // }
- // } else {
- // // single device
- // if (config.isClusterMode()) {
- // tsstatus.setRedirectNode(redirectNodeList.get(0));
- // }
- // }
- // }
+ if (analysis.getStatement() instanceof InsertBaseStatement) {
+ InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
+ List<TEndPoint> redirectNodeList;
+ if (config.isClusterMode()) {
+ redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+ } else {
+ redirectNodeList = Collections.emptyList();
+ }
+ if (insertStatement instanceof InsertRowsStatement
+ || insertStatement instanceof InsertMultiTabletsStatement) {
+ // multiple devices
+ if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ List<TSStatus> subStatus = new ArrayList<>();
+ tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ for (TEndPoint endPoint : redirectNodeList) {
+ subStatus.add(
+ StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+ }
+ tsstatus.setSubStatus(subStatus);
+ }
+ } else {
+ // single device
+ if (config.isClusterMode()) {
+ tsstatus.setRedirectNode(redirectNodeList.get(0));
+ }
+ }
+ }
return new ExecutionResult(context.getQueryId(), tsstatus);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index cfc17b45cb..4f043a05b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -66,20 +64,8 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
queryContext.getQueryType(),
queryContext.getTimeOut());
instance.setDataRegionAndHost(split.getRegionReplicaSet());
- instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
ret.add(instance);
}
return ret;
}
-
- private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
- String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
- int targetIndex = regionReplicaSet.regionId.id % 3;
- for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
- if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
- return location;
- }
- }
- return null;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index b1420f9499..d9808da701 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,8 +77,6 @@ public abstract class InsertNode extends WritePlanNode {
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- protected long syncIndex = NO_CONSENSUS_INDEX;
-
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -155,14 +153,6 @@ public abstract class InsertNode extends WritePlanNode {
return searchIndex;
}
- public void setSyncIndex(long syncIndex) {
- this.syncIndex = syncIndex;
- }
-
- public long getSyncIndex() {
- return syncIndex;
- }
-
/** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;