You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 10:30:44 UTC

[iotdb] 01/02: add new policy

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e8a4889eb39fe23379451ccce1626f2ce345b0b0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 18:28:50 2022 +0800

    add new policy
---
 .../statemachine/DataRegionStateMachine.java       | 42 ++++++++++++++--------
 1 file changed, 28 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 40f378b24c..b657ce767f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -50,6 +50,9 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -60,8 +63,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
-  private static final int MAX_REQUEST_CACHE_SIZE = 50;
+  private static final int MAX_REQUEST_CACHE_SIZE = 5;
+  private static final long CACHE_WINDOW_TIME_IN_MS = 1000;
   private final PriorityQueue<InsertNodeWrapper> requestCache;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition cacheCondition = lock.newCondition();
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
@@ -111,17 +117,25 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-    synchronized (requestCache) {
+    try {
+      lock.lock();
       requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
-      //      while(!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE &&
-      // requestCache.peek().getSyncIndex() == syncIndex)) {
-      //        requestCache.wait();
-      //      }
-      if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
-        return requestCache.poll().getInsertNode();
-      } else {
-        return null;
+      while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
+          && requestCache.peek().getSyncIndex() == syncIndex)) {
+        try {
+          boolean timeoutTriggered =
+              !cacheCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+          if (timeoutTriggered) {
+            break;
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
       }
+      cacheCondition.notifyAll();
+      return requestCache.poll().getInsertNode();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -161,10 +175,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        planNode = mergeInsertNodes(insertNodes);
-        //        planNode =
-        //            cacheAndGetLatestInsertNode(
-        //                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+        //        planNode = mergeInsertNodes(insertNodes);
+        planNode =
+            cacheAndGetLatestInsertNode(
+                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
         // TODO: tmp way to do the test
         if (planNode == null) {
           return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());