You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 04:20:36 UTC

[iotdb] branch ml_0808_test_exp1_parallel updated: change wait/notify to await/signalall to leverage timeout result

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
     new ddf0dc3b1f change wait/notify to await/signalall to leverage timeout result
ddf0dc3b1f is described below

commit ddf0dc3b1fe96ac492f10847f137511384e0626b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 12:20:23 2022 +0800

    change wait/notify to await/signalall to leverage timeout result
---
 .../statemachine/DataRegionStateMachine.java       | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 12746e9234..0098c69975 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -53,6 +53,10 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -66,6 +70,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
 
+  private final Lock queueLock = new ReentrantLock();
+  private final Condition queueSortCondition = queueLock.newCondition();
   private final PriorityQueue<InsertNodeWrapper> requestCache;
   private long nextSyncIndex = -1;
 
@@ -120,13 +126,14 @@ public class DataRegionStateMachine extends BaseStateMachine {
     long cacheRequestStartTime = System.nanoTime();
     logger.info(
         "syncIndex = {}, nextSyncIndex = {}", insertNodeWrapper.startSyncIndex, nextSyncIndex);
-    synchronized (requestCache) {
+    queueLock.lock();
+    try {
       requestCache.add(insertNodeWrapper);
       // If the peek is not hold by current thread, it should notify the corresponding thread to
       // process the peek when the queue is full
       if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
           && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
-        requestCache.notifyAll();
+        queueSortCondition.signalAll();
       }
       while (true) {
         if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
@@ -141,8 +148,20 @@ public class DataRegionStateMachine extends BaseStateMachine {
           break;
         }
         try {
-          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
+          boolean timeout =
+              !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+          if (timeout) {
+            logger.info(
+                "waiting target request timeout. current index: {}, target index: {}",
+                insertNodeWrapper.getStartSyncIndex(),
+                nextSyncIndex);
+            break;
+          }
         } catch (InterruptedException e) {
+          logger.warn(
+              "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+              insertNodeWrapper.getStartSyncIndex(),
+              e);
           Thread.currentThread().interrupt();
         }
       }
@@ -156,9 +175,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
       for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
         subStatus.add(write(insertNode));
       }
-      // TODO: think about notifying until processing the last request in this batch
-      requestCache.notifyAll();
+      queueSortCondition.signalAll();
       return new TSStatus().setSubStatus(subStatus);
+    } finally {
+      queueLock.unlock();
     }
   }