You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 03:39:32 UTC
[iotdb] branch handle_redirection_during_dispatching updated: use preferred location to redirect
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/handle_redirection_during_dispatching by this push:
new 268fae19a0 use preferred location to redirect
268fae19a0 is described below
commit 268fae19a0cd4f83be5ee1b24f11a4ece44e1726
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 11:42:26 2023 +0800
use preferred location to redirect
---
.../planner/plan/node/write/InsertTabletNode.java | 82 ++++++++++++++++------
1 file changed, 59 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 09fa7c92b7..8348dbac45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -160,7 +160,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ }
@Override
public PlanNode clone() {
@@ -239,12 +240,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
.getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
// collect redirectInfo
+ TRegionReplicaSet tRegionReplicaSet = dataRegionReplicaSets
+ .get(dataRegionReplicaSets.size() - 1);
+ int preferredLocation =
+ tRegionReplicaSet.isSetPreferredLocation() ? tRegionReplicaSet.preferredLocation : 0;
analysis.addEndPointToRedirectNodeList(
- dataRegionReplicaSets
- .get(dataRegionReplicaSets.size() - 1)
- .getDataNodeLocations()
- .get(0)
- .getClientRpcEndPoint());
+ tRegionReplicaSet.getDataNodeLocations().get(preferredLocation).getClientRpcEndPoint());
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
@@ -447,7 +448,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
}
- /** Serialize measurements or measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurements or measurement schemas, ignoring failed time series
+ */
private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer);
ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer);
@@ -466,7 +469,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize measurements or measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurements or measurement schemas, ignoring failed time series
+ */
private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream);
ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream);
@@ -485,7 +490,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize data types, ignoring failed time series */
+ /**
+ * Serialize data types, ignoring failed time series
+ */
private void writeDataTypes(ByteBuffer buffer) {
for (int i = 0; i < dataTypes.length; i++) {
// ignore failed partial insert
@@ -496,7 +503,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize data types, ignoring failed time series */
+ /**
+ * Serialize data types, ignoring failed time series
+ */
private void writeDataTypes(DataOutputStream stream) throws IOException {
for (int i = 0; i < dataTypes.length; i++) {
// ignore failed partial insert
@@ -521,7 +530,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(ByteBuffer buffer) {
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer);
if (bitMaps != null) {
@@ -541,7 +552,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
if (bitMaps != null) {
@@ -561,7 +574,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(ByteBuffer buffer) {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -572,7 +587,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(DataOutputStream stream) throws IOException {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -719,13 +736,18 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
// region serialize & deserialize methods for WAL
- /** Serialized size for wal */
+
+ /**
+ * Serialized size for wal
+ */
@Override
public int serializedSize() {
return serializedSize(0, rowCount);
}
- /** Serialized size for wal */
+ /**
+ * Serialized size for wal
+ */
public int serializedSize(int start, int end) {
return Short.BYTES + subSerializeSize(start, end);
}
@@ -822,7 +844,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
buffer.put((byte) (isAligned ? 1 : 0));
}
- /** Serialize measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurement schemas, ignoring failed time series
+ */
private void writeMeasurementSchemas(IWALByteBufferView buffer) {
buffer.putInt(measurements.length - getFailedMeasurementNumber());
serializeMeasurementSchemasToWAL(buffer);
@@ -835,7 +859,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize bitmaps, ignoring failed time series */
+ /**
+ * Serialize bitmaps, ignoring failed time series
+ */
private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
@@ -858,7 +884,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Serialize values, ignoring failed time series */
+ /**
+ * Serialize values, ignoring failed time series
+ */
private void writeValues(IWALByteBufferView buffer, int start, int end) {
for (int i = 0; i < columns.length; i++) {
// ignore failed partial insert
@@ -914,7 +942,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
}
}
- /** Deserialize from wal */
+ /**
+ * Deserialize from wal
+ */
public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws IOException {
// we do not store plan node id in wal entry
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
@@ -1000,9 +1030,15 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
InsertTabletNode that = (InsertTabletNode) o;
return rowCount == that.rowCount
&& Arrays.equals(times, that.times)