You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/18 12:32:21 UTC
[iotdb] branch master updated: [IOTDB-3196] Add search index in InsertNode (#5945)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3e63945619 [IOTDB-3196] Add search index in InsertNode (#5945)
3e63945619 is described below
commit 3e63945619e29bf19e45ec35752a13f6ad6ab4a2
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 18 20:32:15 2022 +0800
[IOTDB-3196] Add search index in InsertNode (#5945)
* add search index in insert node
* add index
* fix ci
---
.../plan/node/write/InsertMultiTabletsNode.java | 10 ++++++++
.../plan/planner/plan/node/write/InsertNode.java | 30 ++++++++++++++++++++++
.../planner/plan/node/write/InsertRowNode.java | 4 ++-
.../planner/plan/node/write/InsertRowsNode.java | 10 ++++++++
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 10 ++++++++
.../planner/plan/node/write/InsertTabletNode.java | 3 +++
.../plan/node/write/InsertTabletNodeSerdeTest.java | 2 +-
7 files changed, 67 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 13cf231923..8f3f0200c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -116,6 +116,16 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
parentInsertTabletNodeIndexList.add(parentIndex);
}
+ @Override
+ public void setSearchIndex(long index) {
+ insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+ }
+
+ @Override
+ public void setSafelyDeletedSearchIndex(long index) {
+ insertTabletNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+ }
+
@Override
public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 2b79cb36a3..1fa2195dde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -44,6 +44,10 @@ import java.util.Objects;
import java.util.stream.Collectors;
public abstract class InsertNode extends WritePlanNode implements IConsensusRequest {
+ /** this insert node doesn't need to participate in multi-leader consensus */
+ public static final long NO_CONSENSUS_INDEX = -1;
+ /** no multi-leader consensus, all insert nodes can be safely deleted */
+ public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
/**
* if use id table, this filed is id form of device path <br>
@@ -67,6 +71,14 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
*/
protected IDeviceID deviceID;
+ /** this index is used by wal search, its order should be protected by the upper layer */
+ protected long searchIndex = NO_CONSENSUS_INDEX;
+ /**
+ * this index pass info to wal, indicating that insert nodes whose search index are before this
+ * value can be deleted safely
+ */
+ protected long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
+
/** Physical address of data region after splitting */
TRegionReplicaSet dataRegionReplicaSet;
@@ -139,6 +151,22 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
this.deviceID = deviceID;
}
+ public long getSearchIndex() {
+ return searchIndex;
+ }
+
+ public void setSearchIndex(long searchIndex) {
+ this.searchIndex = searchIndex;
+ }
+
+ public long getSafelyDeletedSearchIndex() {
+ return safelyDeletedSearchIndex;
+ }
+
+ public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+ this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
+ }
+
/**
* Deserialize via {@link
* org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
@@ -153,6 +181,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
}
+ // region Serialization methods for WAL
/** Serialized size of measurement schemas, ignoring failed time series */
protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
@@ -187,6 +216,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
measurements[i] = measurementSchemas[i].getMeasurementId();
}
}
+ // endregion
public TRegionReplicaSet getRegionReplicaSet() {
return dataRegionReplicaSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 61a8081909..0ab68df815 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -422,7 +422,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
private int subSerializeSize() {
int size = 0;
- size += Long.BYTES;
+ size += Long.BYTES * 2;
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
return size + serializeMeasurementsAndValuesSize();
}
@@ -478,6 +478,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
private void subSerialize(IWALByteBufferView buffer) {
+ buffer.putLong(searchIndex);
buffer.putLong(time);
WALWriteUtils.write(devicePath.getFullPath(), buffer);
serializeMeasurementsAndValues(buffer);
@@ -534,6 +535,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
throws IOException, IllegalPathException {
// we do not store plan node id in wal entry
InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+ insertNode.setSearchIndex(stream.readLong());
insertNode.setTime(stream.readLong());
insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
insertNode.deserializeMeasurementsAndValues(stream);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 5ca6141a8d..e3c82a5e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -86,6 +86,16 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
insertRowNodeIndexList.add(index);
}
+ @Override
+ public void setSearchIndex(long index) {
+ insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ }
+
+ @Override
+ public void setSafelyDeletedSearchIndex(long index) {
+ insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+ }
+
public Map<Integer, TSStatus> getResults() {
return results;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index eaa8d2f2a8..2fd932c4bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -75,6 +75,16 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
return results;
}
+ @Override
+ public void setSearchIndex(long index) {
+ insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ }
+
+ @Override
+ public void setSafelyDeletedSearchIndex(long index) {
+ insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
+ }
+
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 24b560cc9a..160b1a94e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -537,6 +537,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
int subSerializeSize(int start, int end) {
int size = 0;
+ size += Long.BYTES;
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
// measurements size
size += Integer.BYTES;
@@ -612,6 +613,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
void subSerialize(IWALByteBufferView buffer, int start, int end) {
+ buffer.putLong(searchIndex);
WALWriteUtils.write(devicePath.getFullPath(), buffer);
// data types are serialized in measurement schemas
writeMeasurementSchemas(buffer);
@@ -723,6 +725,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
+ searchIndex = stream.readLong();
devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
int measurementSize = stream.readInt();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index b1c73bc080..b6604aedb3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -61,7 +61,7 @@ public class InsertTabletNodeSerdeTest {
}
@Test
- public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+ public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
int serializedSize = insertTabletNode.serializedSize();