You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 07:02:12 UTC

[iotdb] branch ml_0808_test_exp1_parallel updated: add another constraint to keep the write order as far as possible

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
     new dc06fca1b9 add another constraint to keep the write order as far as possible
dc06fca1b9 is described below

commit dc06fca1b9f270b304c8145182b13d2048b98e84
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 15:01:59 2022 +0800

    add another constraint to keep the write order as far as possible
---
 .../statemachine/DataRegionStateMachine.java        | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index b182a811cf..f6f1628d2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -155,12 +155,21 @@ public class DataRegionStateMachine extends BaseStateMachine {
           boolean timeout =
               !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
           if (timeout) {
-            logger.info(
-                "waiting target request timeout. current index: {}, target index: {}",
-                insertNodeWrapper.getStartSyncIndex(),
-                nextSyncIndex);
-            requestCache.remove(insertNodeWrapper);
-            break;
+            if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+              // current thread hold the peek request thus it can write the peek immediately.
+              logger.info(
+                  "waiting target request timeout. current index: {}, target index: {}",
+                  insertNodeWrapper.getStartSyncIndex(),
+                  nextSyncIndex);
+              requestCache.remove(insertNodeWrapper);
+              break;
+            } else {
+              // although the timeout is triggered, current thread cannot write its request
+              // because there should be some other thread who hold the peek request.
+              // And current thread should signal all the other threads to let the thread
+              // who holds the peed request to execute write operation.
+              queueSortCondition.signalAll();
+            }
           }
         } catch (InterruptedException e) {
           logger.warn(