You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/04 08:58:14 UTC
[iotdb] branch ml_test_1_async updated: add more metrics for each step in leader and follower
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_test_1_async by this push:
new cf18b64979 add more metrics for each step in leader and follower
cf18b64979 is described below
commit cf18b649798382fc160438ebf565462bbd94b04f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Aug 4 16:57:59 2022 +0800
add more metrics for each step in leader and follower
---
.../multileader/MultiLeaderServerImpl.java | 11 +-
.../multileader/client/DispatchLogHandler.java | 3 +
.../multileader/logdispatcher/LogDispatcher.java | 146 +++++++++++----------
.../service/MultiLeaderRPCServiceProcessor.java | 8 +-
4 files changed, 95 insertions(+), 73 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index f8a08aa553..0fb33fce8e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -110,8 +110,10 @@ public class MultiLeaderServerImpl {
* records the index of the log and writes locally, and then asynchronous replication is performed
*/
public TSStatus write(IConsensusRequest request) {
+ long leaderWriteStartTime = System.nanoTime();
synchronized (stateMachine) {
- long startTime = System.nanoTime();
+ StepTracker.trace("LeaderWriteWaitLock", leaderWriteStartTime, System.nanoTime());
+ long startTimeAfterLock = System.nanoTime();
try {
IndexedConsensusRequest indexedConsensusRequest =
buildIndexedConsensusRequestForLocalRequest(request);
@@ -124,6 +126,8 @@ public class MultiLeaderServerImpl {
}
// TODO wal and memtable
TSStatus result = stateMachine.write(indexedConsensusRequest);
+ StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
+ long offerStartTime = System.nanoTime();
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
logDispatcher.offer(indexedConsensusRequest);
} else {
@@ -134,10 +138,11 @@ public class MultiLeaderServerImpl {
result.getCode());
index.decrementAndGet();
}
-
+ StepTracker.trace("serializeAndOfferToQueue", offerStartTime, System.nanoTime());
return result;
} finally {
- StepTracker.trace("MultiLeaderWrite", startTime, System.nanoTime());
+ StepTracker.trace("MultiLeaderWriteAfterLock", startTimeAfterLock, System.nanoTime());
+ StepTracker.trace("MultiLeaderWriteWhole", leaderWriteStartTime, System.nanoTime());
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..ab6cfe655f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.multileader.client;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,6 +38,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
private final LogDispatcherThread thread;
private final PendingBatch batch;
private int retryCount;
+ private final long startTime = System.nanoTime();
public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
this.thread = thread;
@@ -45,6 +47,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
@Override
public void onComplete(TSyncLogRes response) {
+ StepTracker.trace("leaderSendUtilResponse", 25, startTime, System.nanoTime());
if (response.getStatus().size() == 1
&& response.getStatus().get(0).getCode()
== TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 6eb7c2e7de..3c3f5ae113 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
@@ -221,7 +222,9 @@ public class LogDispatcher {
}
}
// we may block here if the synchronization pipeline is full
+ long getBatchSlotStartTime = System.nanoTime();
syncStatus.addNextBatch(batch);
+ StepTracker.trace("getBatchSlot", getBatchSlotStartTime, System.nanoTime());
// sends batch asynchronously and migrates the retry logic into the callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
@@ -241,82 +244,89 @@ public class LogDispatcher {
}
public PendingBatch getBatch() {
- PendingBatch batch;
- List<TLogBatch> logBatches = new ArrayList<>();
- long startIndex = syncStatus.getNextSendingIndex();
- logger.debug("[GetBatch] startIndex: {}", startIndex);
- long endIndex;
- if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
- // Use drainTo instead of poll to reduce lock overhead
- logger.debug(
- "{} : pendingRequest Size: {}, bufferedRequest size: {}",
- impl.getThisNode().getGroupId(),
- pendingRequest.size(),
- bufferedRequest.size());
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
- // remove all request that searchIndex < startIndex
- Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
- while (iterator.hasNext()) {
- IndexedConsensusRequest request = iterator.next();
- if (request.getSearchIndex() < startIndex) {
- iterator.remove();
- } else {
- break;
+ long getBatchStartTime = System.nanoTime();
+ try {
+ PendingBatch batch;
+ List<TLogBatch> logBatches = new ArrayList<>();
+ long startIndex = syncStatus.getNextSendingIndex();
+ logger.debug("[GetBatch] startIndex: {}", startIndex);
+ long endIndex;
+ if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+ // Use drainTo instead of poll to reduce lock overhead
+ logger.debug(
+ "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+ impl.getThisNode().getGroupId(),
+ pendingRequest.size(),
+ bufferedRequest.size());
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+ // remove all request that searchIndex < startIndex
+ Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ while (iterator.hasNext()) {
+ IndexedConsensusRequest request = iterator.next();
+ if (request.getSearchIndex() < startIndex) {
+ iterator.remove();
+ } else {
+ break;
+ }
}
}
- }
- if (bufferedRequest.isEmpty()) { // only execute this after a restart
- endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
- } else {
- // Notice that prev searchIndex >= startIndex
- Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
- IndexedConsensusRequest prev = iterator.next();
- // Prevents gap between logs. For example, some requests are not written into the queue when
- // the queue is full. In this case, requests need to be loaded from the WAL
- endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
- if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+ if (bufferedRequest.isEmpty()) { // only execute this after a restart
+ endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
- return batch;
- }
- constructBatchIndexedFromConsensusRequest(prev, logBatches);
- endIndex = prev.getSearchIndex();
- iterator.remove();
- while (iterator.hasNext()
- && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
- IndexedConsensusRequest current = iterator.next();
- // Prevents gap between logs. For example, some logs are not written into the queue when
+ logger.debug(
+ "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
+ } else {
+ // Notice that prev searchIndex >= startIndex
+ Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ IndexedConsensusRequest prev = iterator.next();
+ // Prevents gap between logs. For example, some requests are not written into the queue
+ // when
// the queue is full. In this case, requests need to be loaded from the WAL
- if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
- endIndex =
- constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
- if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "gap {} : accumulated a {} from queue and wal when gap",
- impl.getThisNode().getGroupId(),
- batch);
- return batch;
- }
+ endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
+ if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
+ return batch;
}
- constructBatchIndexedFromConsensusRequest(current, logBatches);
- endIndex = current.getSearchIndex();
- prev = current;
- // We might not be able to remove all the elements in the bufferedRequest in the
- // current function, but that's fine, we'll continue processing these elements in the
- // bufferedRequest the next time we go into the function, they're never lost
+ constructBatchIndexedFromConsensusRequest(prev, logBatches);
+ endIndex = prev.getSearchIndex();
iterator.remove();
+ while (iterator.hasNext()
+ && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
+ IndexedConsensusRequest current = iterator.next();
+ // Prevents gap between logs. For example, some logs are not written into the queue when
+ // the queue is full. In this case, requests need to be loaded from the WAL
+ if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
+ endIndex =
+ constructBatchFromWAL(
+ prev.getSearchIndex(), current.getSearchIndex(), logBatches);
+ if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug(
+ "gap {} : accumulated a {} from queue and wal when gap",
+ impl.getThisNode().getGroupId(),
+ batch);
+ return batch;
+ }
+ }
+ constructBatchIndexedFromConsensusRequest(current, logBatches);
+ endIndex = current.getSearchIndex();
+ prev = current;
+ // We might not be able to remove all the elements in the bufferedRequest in the
+ // current function, but that's fine, we'll continue processing these elements in the
+ // bufferedRequest the next time we go into the function, they're never lost
+ iterator.remove();
+ }
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug(
+ "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
}
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
+ return batch;
+ } finally {
+ StepTracker.trace("getBatch()", 25, getBatchStartTime, System.nanoTime());
}
- return batch;
}
public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b2018db1d8..c6d675be5e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -53,7 +53,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
@Override
public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
- long startTime = System.nanoTime();
+ long syncLogStartTime = System.nanoTime();
try {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -83,18 +83,22 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
: new ByteBufferConsensusRequest(batch.data);
// merge TLogBatch with same search index into one request
if (batch.getSearchIndex() != currentSearchIndex) {
+ long followerWriteStartTime = System.nanoTime();
statuses.add(
impl.getStateMachine()
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ StepTracker.trace("followerWrite", followerWriteStartTime, System.nanoTime());
consensusRequests = new ArrayList<>();
}
consensusRequests.add(request);
}
// write last request
if (!consensusRequests.isEmpty()) {
+ long followerWriteStartTime = System.nanoTime();
statuses.add(
impl.getStateMachine()
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ StepTracker.trace("followerWrite", followerWriteStartTime, System.nanoTime());
}
StepTracker.trace("stateMachineWriteBatch", stateMachineStartTime, System.nanoTime());
}
@@ -104,7 +108,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
} catch (Exception e) {
resultHandler.onError(e);
} finally {
- StepTracker.trace("syncLog", 25, startTime, System.nanoTime());
+ StepTracker.trace("syncLog", 25, syncLogStartTime, System.nanoTime());
}
}