You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/01 08:03:06 UTC
[iotdb] branch ml_0729_test updated: discard new object creation
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test by this push:
new b07fe48543 discard new object creation
b07fe48543 is described below
commit b07fe48543418c025d731c49d960cb8a175f1366
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 1 16:02:51 2022 +0800
discard new object creation
---
.../statemachine/DataRegionStateMachine.java | 38 ++++++++++++++++++----
.../plan/planner/plan/node/write/InsertNode.java | 10 ++++++
2 files changed, 41 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index fbb1020e8f..0fd9dc5a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
@@ -64,11 +65,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
- private final PriorityQueue<InsertNodeWrapper> requestCache;
+ private final PriorityQueue<InsertNode> requestCache;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.requestCache = new PriorityQueue<>();
+ this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
}
@Override
@@ -114,9 +115,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-
+ insertNode.setSyncIndex(syncIndex);
synchronized (requestCache) {
- requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+ requestCache.add(insertNode);
requestCache.notifyAll();
while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getSyncIndex() == syncIndex)) {
@@ -127,12 +128,35 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
requestCache.notifyAll();
- InsertNodeWrapper wrapper = requestCache.poll();
- logger.info("queue size {}, syncIndex = {}", requestCache.size(), wrapper.getSyncIndex());
- return wrapper.getInsertNode();
+ InsertNode nextNode = requestCache.poll();
+ // logger.info("queue size {}, syncIndex = {}", requestCache.size(),
+ // nextNode.getSyncIndex());
+ return nextNode;
}
}
+ // private InsertNode cacheAndGetLatestInsertNode2(long syncIndex, InsertNode insertNode) {
+ // insertNode.setSyncIndex(syncIndex);
+ // synchronized (requestCache) {
+ // requestCache.add(insertNode);
+ // if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
+ // if (insertNode == requestCache.peek()) {
+ // return requestCache.poll();
+ // } else {
+ // requestCache.poll().notify();
+ // }
+ // }
+ // }
+ //
+ // synchronized (requestCache) {
+ // try {
+ // insertNode.wait(CACHE_WINDOW_TIME_IN_MS);
+ // } catch (InterruptedException e) {
+ // Thread.currentThread().interrupt();
+ // }
+ // }
+ // }
+
private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
private final long syncIndex;
private final InsertNode insertNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index f5b5bddb52..787dfbf9ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode {
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
+ protected long syncIndex = NO_CONSENSUS_INDEX;
+
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode {
return searchIndex;
}
+ public void setSyncIndex(long syncIndex) {
+ this.syncIndex = syncIndex;
+ }
+
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
/** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;