You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 10:30:44 UTC
[iotdb] 01/02: add new policy
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e8a4889eb39fe23379451ccce1626f2ce345b0b0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 18:28:50 2022 +0800
add new policy
---
.../statemachine/DataRegionStateMachine.java | 42 ++++++++++++++--------
1 file changed, 28 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 40f378b24c..b657ce767f 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -50,6 +50,9 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -60,8 +63,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
private DataRegion region;
- private static final int MAX_REQUEST_CACHE_SIZE = 50;
+ private static final int MAX_REQUEST_CACHE_SIZE = 5;
+ private static final long CACHE_WINDOW_TIME_IN_MS = 1000;
private final PriorityQueue<InsertNodeWrapper> requestCache;
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition cacheCondition = lock.newCondition();
public DataRegionStateMachine(DataRegion region) {
this.region = region;
@@ -111,17 +117,25 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
- synchronized (requestCache) {
+ try {
+ lock.lock();
requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
- // while(!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE &&
- // requestCache.peek().getSyncIndex() == syncIndex)) {
- // requestCache.wait();
- // }
- if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
- return requestCache.poll().getInsertNode();
- } else {
- return null;
+ while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
+ && requestCache.peek().getSyncIndex() == syncIndex)) {
+ try {
+ boolean timeoutTriggered =
+ !cacheCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ if (timeoutTriggered) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
+ cacheCondition.notifyAll();
+ return requestCache.poll().getInsertNode();
+ } finally {
+ lock.unlock();
}
}
@@ -161,10 +175,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
}
- planNode = mergeInsertNodes(insertNodes);
- // planNode =
- // cacheAndGetLatestInsertNode(
- // indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+ // planNode = mergeInsertNodes(insertNodes);
+ planNode =
+ cacheAndGetLatestInsertNode(
+ indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
// TODO: tmp way to do the test
if (planNode == null) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());