You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/01 08:03:06 UTC

[iotdb] branch ml_0729_test updated: discard new object creation

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test by this push:
     new b07fe48543 discard new object creation
b07fe48543 is described below

commit b07fe48543418c025d731c49d960cb8a175f1366
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 1 16:02:51 2022 +0800

    discard new object creation
---
 .../statemachine/DataRegionStateMachine.java       | 38 ++++++++++++++++++----
 .../plan/planner/plan/node/write/InsertNode.java   | 10 ++++++
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index fbb1020e8f..0fd9dc5a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -64,11 +65,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
-  private final PriorityQueue<InsertNodeWrapper> requestCache;
+  private final PriorityQueue<InsertNode> requestCache;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>();
+    this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
   }
 
   @Override
@@ -114,9 +115,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-
+    insertNode.setSyncIndex(syncIndex);
     synchronized (requestCache) {
-      requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+      requestCache.add(insertNode);
       requestCache.notifyAll();
       while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
           && requestCache.peek().getSyncIndex() == syncIndex)) {
@@ -127,12 +128,35 @@ public class DataRegionStateMachine extends BaseStateMachine {
         }
       }
       requestCache.notifyAll();
-      InsertNodeWrapper wrapper = requestCache.poll();
-      logger.info("queue size {}, syncIndex = {}", requestCache.size(), wrapper.getSyncIndex());
-      return wrapper.getInsertNode();
+      InsertNode nextNode = requestCache.poll();
+      //      logger.info("queue size {}, syncIndex = {}", requestCache.size(),
+      // nextNode.getSyncIndex());
+      return nextNode;
     }
   }
 
+  //  private InsertNode cacheAndGetLatestInsertNode2(long syncIndex, InsertNode insertNode) {
+  //    insertNode.setSyncIndex(syncIndex);
+  //    synchronized (requestCache) {
+  //      requestCache.add(insertNode);
+  //      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
+  //        if (insertNode == requestCache.peek()) {
+  //          return requestCache.poll();
+  //        } else {
+  //          requestCache.poll().notify();
+  //        }
+  //      }
+  //    }
+  //
+  //    synchronized (requestCache) {
+  //      try {
+  //        insertNode.wait(CACHE_WINDOW_TIME_IN_MS);
+  //      } catch (InterruptedException e) {
+  //        Thread.currentThread().interrupt();
+  //      }
+  //    }
+  //  }
+
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long syncIndex;
     private final InsertNode insertNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index f5b5bddb52..787dfbf9ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,6 +77,8 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected long syncIndex = NO_CONSENSUS_INDEX;
+
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -153,6 +155,14 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
+  public void setSyncIndex(long syncIndex) {
+    this.syncIndex = syncIndex;
+  }
+
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;