You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/18 12:32:21 UTC

[iotdb] branch master updated: [IOTDB-3196] Add search index in InsertNode (#5945)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e63945619 [IOTDB-3196] Add search index in InsertNode (#5945)
3e63945619 is described below

commit 3e63945619e29bf19e45ec35752a13f6ad6ab4a2
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 18 20:32:15 2022 +0800

    [IOTDB-3196] Add search index in InsertNode (#5945)
    
    * add search index in insert node
    
    * add index
    
    * fix ci
---
 .../plan/node/write/InsertMultiTabletsNode.java    | 10 ++++++++
 .../plan/planner/plan/node/write/InsertNode.java   | 30 ++++++++++++++++++++++
 .../planner/plan/node/write/InsertRowNode.java     |  4 ++-
 .../planner/plan/node/write/InsertRowsNode.java    | 10 ++++++++
 .../plan/node/write/InsertRowsOfOneDeviceNode.java | 10 ++++++++
 .../planner/plan/node/write/InsertTabletNode.java  |  3 +++
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  2 +-
 7 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 13cf231923..8f3f0200c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -116,6 +116,16 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     parentInsertTabletNodeIndexList.add(parentIndex);
   }
 
+  @Override
+  public void setSearchIndex(long index) {
+    insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+  }
+
+  @Override
+  public void setSafelyDeletedSearchIndex(long index) {
+    insertTabletNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+  }
+
   @Override
   public boolean validateAndSetSchema(SchemaTree schemaTree) {
     for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 2b79cb36a3..1fa2195dde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -44,6 +44,10 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 public abstract class InsertNode extends WritePlanNode implements IConsensusRequest {
+  /** this insert node doesn't need to participate in multi-leader consensus */
+  public static final long NO_CONSENSUS_INDEX = -1;
+  /** no multi-leader consensus, all insert nodes can be safely deleted */
+  public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
 
   /**
    * if use id table, this filed is id form of device path <br>
@@ -67,6 +71,14 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
    */
   protected IDeviceID deviceID;
 
+  /** this index is used by wal search, its order should be protected by the upper layer */
+  protected long searchIndex = NO_CONSENSUS_INDEX;
+  /**
+   * this index pass info to wal, indicating that insert nodes whose search index are before this
+   * value can be deleted safely
+   */
+  protected long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
+
   /** Physical address of data region after splitting */
   TRegionReplicaSet dataRegionReplicaSet;
 
@@ -139,6 +151,22 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
     this.deviceID = deviceID;
   }
 
+  public long getSearchIndex() {
+    return searchIndex;
+  }
+
+  public void setSearchIndex(long searchIndex) {
+    this.searchIndex = searchIndex;
+  }
+
+  public long getSafelyDeletedSearchIndex() {
+    return safelyDeletedSearchIndex;
+  }
+
+  public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+    this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
+  }
+
   /**
    * Deserialize via {@link
    * org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
@@ -153,6 +181,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
     throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
   }
 
+  // region Serialization methods for WAL
   /** Serialized size of measurement schemas, ignoring failed time series */
   protected int serializeMeasurementSchemasSize() {
     int byteLen = 0;
@@ -187,6 +216,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
       measurements[i] = measurementSchemas[i].getMeasurementId();
     }
   }
+  // endregion
 
   public TRegionReplicaSet getRegionReplicaSet() {
     return dataRegionReplicaSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 61a8081909..0ab68df815 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -422,7 +422,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   private int subSerializeSize() {
     int size = 0;
-    size += Long.BYTES;
+    size += Long.BYTES * 2;
     size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
     return size + serializeMeasurementsAndValuesSize();
   }
@@ -478,6 +478,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   private void subSerialize(IWALByteBufferView buffer) {
+    buffer.putLong(searchIndex);
     buffer.putLong(time);
     WALWriteUtils.write(devicePath.getFullPath(), buffer);
     serializeMeasurementsAndValues(buffer);
@@ -534,6 +535,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       throws IOException, IllegalPathException {
     // we do not store plan node id in wal entry
     InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+    insertNode.setSearchIndex(stream.readLong());
     insertNode.setTime(stream.readLong());
     insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
     insertNode.deserializeMeasurementsAndValues(stream);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 5ca6141a8d..e3c82a5e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -86,6 +86,16 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
     insertRowNodeIndexList.add(index);
   }
 
+  @Override
+  public void setSearchIndex(long index) {
+    insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+  }
+
+  @Override
+  public void setSafelyDeletedSearchIndex(long index) {
+    insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+  }
+
   public Map<Integer, TSStatus> getResults() {
     return results;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index eaa8d2f2a8..2fd932c4bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -75,6 +75,16 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
     return results;
   }
 
+  @Override
+  public void setSearchIndex(long index) {
+    insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+  }
+
+  @Override
+  public void setSafelyDeletedSearchIndex(long index) {
+    insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+  }
+
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 24b560cc9a..160b1a94e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -537,6 +537,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   int subSerializeSize(int start, int end) {
     int size = 0;
+    size += Long.BYTES;
     size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
     // measurements size
     size += Integer.BYTES;
@@ -612,6 +613,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   void subSerialize(IWALByteBufferView buffer, int start, int end) {
+    buffer.putLong(searchIndex);
     WALWriteUtils.write(devicePath.getFullPath(), buffer);
     // data types are serialized in measurement schemas
     writeMeasurementSchemas(buffer);
@@ -723,6 +725,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+    searchIndex = stream.readLong();
     devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
 
     int measurementSize = stream.readInt();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index b1c73bc080..b6604aedb3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -61,7 +61,7 @@ public class InsertTabletNodeSerdeTest {
   }
 
   @Test
-  public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+  public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
     InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
 
     int serializedSize = insertTabletNode.serializedSize();