You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/16 03:39:32 UTC

[iotdb] branch handle_redirection_during_dispatching updated: use preferred location to redirect

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch handle_redirection_during_dispatching
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/handle_redirection_during_dispatching by this push:
     new 268fae19a0 use preferred location to redirect
268fae19a0 is described below

commit 268fae19a0cd4f83be5ee1b24f11a4ece44e1726
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 16 11:42:26 2023 +0800

    use preferred location to redirect
---
 .../planner/plan/node/write/InsertTabletNode.java  | 82 ++++++++++++++++------
 1 file changed, 59 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 09fa7c92b7..8348dbac45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -160,7 +160,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+  }
 
   @Override
   public PlanNode clone() {
@@ -239,12 +240,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
             .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
 
     // collect redirectInfo
+    TRegionReplicaSet tRegionReplicaSet = dataRegionReplicaSets
+        .get(dataRegionReplicaSets.size() - 1);
+    int preferredLocation =
+        tRegionReplicaSet.isSetPreferredLocation() ? tRegionReplicaSet.preferredLocation : 0;
     analysis.addEndPointToRedirectNodeList(
-        dataRegionReplicaSets
-            .get(dataRegionReplicaSets.size() - 1)
-            .getDataNodeLocations()
-            .get(0)
-            .getClientRpcEndPoint());
+        tRegionReplicaSet.getDataNodeLocations().get(preferredLocation).getClientRpcEndPoint());
 
     Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
     for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
@@ -447,7 +448,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
   }
 
-  /** Serialize measurements or measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurements or measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
     ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer);
     ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer);
@@ -466,7 +469,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize measurements or measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurements or measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream);
     ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream);
@@ -485,7 +490,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize data types, ignoring failed time series */
+  /**
+   * Serialize data types, ignoring failed time series
+   */
   private void writeDataTypes(ByteBuffer buffer) {
     for (int i = 0; i < dataTypes.length; i++) {
       // ignore failed partial insert
@@ -496,7 +503,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize data types, ignoring failed time series */
+  /**
+   * Serialize data types, ignoring failed time series
+   */
   private void writeDataTypes(DataOutputStream stream) throws IOException {
     for (int i = 0; i < dataTypes.length; i++) {
       // ignore failed partial insert
@@ -521,7 +530,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(ByteBuffer buffer) {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer);
     if (bitMaps != null) {
@@ -541,7 +552,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
     if (bitMaps != null) {
@@ -561,7 +574,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(ByteBuffer buffer) {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -572,7 +587,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(DataOutputStream stream) throws IOException {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -719,13 +736,18 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
   }
 
   // region serialize & deserialize methods for WAL
-  /** Serialized size for wal */
+
+  /**
+   * Serialized size for wal
+   */
   @Override
   public int serializedSize() {
     return serializedSize(0, rowCount);
   }
 
-  /** Serialized size for wal */
+  /**
+   * Serialized size for wal
+   */
   public int serializedSize(int start, int end) {
     return Short.BYTES + subSerializeSize(start, end);
   }
@@ -822,7 +844,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  /** Serialize measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementSchemas(IWALByteBufferView buffer) {
     buffer.putInt(measurements.length - getFailedMeasurementNumber());
     serializeMeasurementSchemasToWAL(buffer);
@@ -835,7 +859,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
@@ -858,7 +884,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(IWALByteBufferView buffer, int start, int end) {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -914,7 +942,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
   }
 
-  /** Deserialize from wal */
+  /**
+   * Deserialize from wal
+   */
   public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws IOException {
     // we do not store plan node id in wal entry
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
@@ -1000,9 +1030,15 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     InsertTabletNode that = (InsertTabletNode) o;
     return rowCount == that.rowCount
         && Arrays.equals(times, that.times)