You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 07:02:12 UTC
[iotdb] branch ml_0808_test_exp1_parallel updated: add another constraint to keep the write order as far as possible
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
new dc06fca1b9 add another constraint to keep the write order as far as possible
dc06fca1b9 is described below
commit dc06fca1b9f270b304c8145182b13d2048b98e84
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 15:01:59 2022 +0800
add another constraint to keep the write order as far as possible
---
.../statemachine/DataRegionStateMachine.java | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index b182a811cf..f6f1628d2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -155,12 +155,21 @@ public class DataRegionStateMachine extends BaseStateMachine {
boolean timeout =
!queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
if (timeout) {
- logger.info(
- "waiting target request timeout. current index: {}, target index: {}",
- insertNodeWrapper.getStartSyncIndex(),
- nextSyncIndex);
- requestCache.remove(insertNodeWrapper);
- break;
+ if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+ // current thread hold the peek request thus it can write the peek immediately.
+ logger.info(
+ "waiting target request timeout. current index: {}, target index: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ nextSyncIndex);
+ requestCache.remove(insertNodeWrapper);
+ break;
+ } else {
+ // although the timeout is triggered, current thread cannot write its request
+ // because there should be some other thread who hold the peek request.
+ // And current thread should signal all the other threads to let the thread
+ // who holds the peed request to execute write operation.
+ queueSortCondition.signalAll();
+ }
}
} catch (InterruptedException e) {
logger.warn(