You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 10:30:43 UTC

[iotdb] branch ml_0729_test updated (6405f8096e -> e731197ec3)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 6405f8096e raw mode
     new e8a4889eb3 add new policy
     new e731197ec3 test new plan

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../statemachine/DataRegionStateMachine.java       | 46 +++++++++++++---------
 1 file changed, 28 insertions(+), 18 deletions(-)


[iotdb] 02/02: test new plan

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e731197ec3a07167aba2aa2708a0a61951a6fe9a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 18:30:26 2022 +0800

    test new plan
---
 .../iotdb/db/consensus/statemachine/DataRegionStateMachine.java       | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index b657ce767f..b83fbf1a0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -179,10 +179,6 @@ public class DataRegionStateMachine extends BaseStateMachine {
         planNode =
             cacheAndGetLatestInsertNode(
                 indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
-        // TODO: tmp way to do the test
-        if (planNode == null) {
-          return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-        }
       } else {
         planNode = getPlanNode(request);
       }


[iotdb] 01/02: add new policy

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e8a4889eb39fe23379451ccce1626f2ce345b0b0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 18:28:50 2022 +0800

    add new policy
---
 .../statemachine/DataRegionStateMachine.java       | 42 ++++++++++++++--------
 1 file changed, 28 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 40f378b24c..b657ce767f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -50,6 +50,9 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -60,8 +63,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
-  private static final int MAX_REQUEST_CACHE_SIZE = 50;
+  private static final int MAX_REQUEST_CACHE_SIZE = 5;
+  private static final long CACHE_WINDOW_TIME_IN_MS = 1000;
   private final PriorityQueue<InsertNodeWrapper> requestCache;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition cacheCondition = lock.newCondition();
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
@@ -111,17 +117,25 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-    synchronized (requestCache) {
+    try {
+      lock.lock();
       requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
-      //      while(!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE &&
-      // requestCache.peek().getSyncIndex() == syncIndex)) {
-      //        requestCache.wait();
-      //      }
-      if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
-        return requestCache.poll().getInsertNode();
-      } else {
-        return null;
+      while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
+          && requestCache.peek().getSyncIndex() == syncIndex)) {
+        try {
+          boolean timeoutTriggered =
+              !cacheCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+          if (timeoutTriggered) {
+            break;
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
       }
+      cacheCondition.notifyAll();
+      return requestCache.poll().getInsertNode();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -161,10 +175,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        planNode = mergeInsertNodes(insertNodes);
-        //        planNode =
-        //            cacheAndGetLatestInsertNode(
-        //                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+        //        planNode = mergeInsertNodes(insertNodes);
+        planNode =
+            cacheAndGetLatestInsertNode(
+                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
         // TODO: tmp way to do the test
         if (planNode == null) {
           return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());