You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 04:20:36 UTC
[iotdb] branch ml_0808_test_exp1_parallel updated: change wait/notify to await/signalall to leverage timeout result
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
new ddf0dc3b1f change wait/notify to await/signalall to leverage timeout result
ddf0dc3b1f is described below
commit ddf0dc3b1fe96ac492f10847f137511384e0626b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 12:20:23 2022 +0800
change wait/notify to await/signalall to leverage timeout result
---
.../statemachine/DataRegionStateMachine.java | 30 ++++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 12746e9234..0098c69975 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -53,6 +53,10 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -66,6 +70,8 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
+ private final Lock queueLock = new ReentrantLock();
+ private final Condition queueSortCondition = queueLock.newCondition();
private final PriorityQueue<InsertNodeWrapper> requestCache;
private long nextSyncIndex = -1;
@@ -120,13 +126,14 @@ public class DataRegionStateMachine extends BaseStateMachine {
long cacheRequestStartTime = System.nanoTime();
logger.info(
"syncIndex = {}, nextSyncIndex = {}", insertNodeWrapper.startSyncIndex, nextSyncIndex);
- synchronized (requestCache) {
+ queueLock.lock();
+ try {
requestCache.add(insertNodeWrapper);
// If the peek is not hold by current thread, it should notify the corresponding thread to
// process the peek when the queue is full
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
- requestCache.notifyAll();
+ queueSortCondition.signalAll();
}
while (true) {
if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
@@ -141,8 +148,20 @@ public class DataRegionStateMachine extends BaseStateMachine {
break;
}
try {
- requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
+ boolean timeout =
+ !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ if (timeout) {
+ logger.info(
+ "waiting target request timeout. current index: {}, target index: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ nextSyncIndex);
+ break;
+ }
} catch (InterruptedException e) {
+ logger.warn(
+ "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ e);
Thread.currentThread().interrupt();
}
}
@@ -156,9 +175,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
subStatus.add(write(insertNode));
}
- // TODO: think about notifying until processing the last request in this batch
- requestCache.notifyAll();
+ queueSortCondition.signalAll();
return new TSStatus().setSubStatus(subStatus);
+ } finally {
+ queueLock.unlock();
}
}