You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/22 13:22:53 UTC

[iotdb] branch master updated: [IOTDB-2982] Recover tsfile after datanode restart (#5643)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 39619ff619 [IOTDB-2982] Recover tsfile after datanode restart (#5643)
39619ff619 is described below

commit 39619ff619cf5829d97c7dc8fac58793b8191e3d
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Apr 22 21:22:47 2022 +0800

    [IOTDB-2982] Recover tsfile after datanode restart (#5643)
---
 .../sql/planner/plan/node/write/InsertNode.java    | 27 +++++--------------
 .../sql/planner/plan/node/write/InsertRowNode.java |  1 +
 .../planner/plan/node/write/InsertTabletNode.java  |  1 +
 .../db/wal/recover/file/TsFilePlanRedoer.java      | 19 +++++---------
 .../apache/iotdb/db/wal/utils/WALWriteUtils.java   | 30 ++++++++++++++++++++++
 5 files changed, 45 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 75de32d2c5..32520a924b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -140,32 +140,15 @@ public abstract class InsertNode extends WritePlanNode {
     this.deviceID = deviceID;
   }
 
-  public void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
+  protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
     for (MeasurementSchema measurementSchema : measurementSchemas) {
       if (measurementSchema != null) {
-        WALWriteUtils.write(measurementSchema.getMeasurementId(), buffer);
-
-        WALWriteUtils.write(measurementSchema.getType(), buffer);
-
-        WALWriteUtils.write(measurementSchema.getEncodingType(), buffer);
-
-        WALWriteUtils.write(measurementSchema.getCompressor(), buffer);
-
-        Map<String, String> props = measurementSchema.getProps();
-        if (props == null) {
-          WALWriteUtils.write(0, buffer);
-        } else {
-          WALWriteUtils.write(props.size(), buffer);
-          for (Map.Entry<String, String> entry : props.entrySet()) {
-            WALWriteUtils.write(entry.getKey(), buffer);
-            WALWriteUtils.write(entry.getValue(), buffer);
-          }
-        }
+        WALWriteUtils.write(measurementSchema, buffer);
       }
     }
   }
 
-  public int serializeMeasurementSchemaSize() {
+  protected int serializeMeasurementSchemaSize() {
     int byteLen = 0;
     for (MeasurementSchema measurementSchema : measurementSchemas) {
       if (measurementSchema != null) {
@@ -187,7 +170,7 @@ public abstract class InsertNode extends WritePlanNode {
   }
 
   /** Make sure the measurement schema is already inited before calling this */
-  public void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
+  protected void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
     for (int i = 0; i < measurementSchemas.length; i++) {
 
       measurementSchemas[i] =
@@ -209,6 +192,8 @@ public abstract class InsertNode extends WritePlanNode {
         }
         measurementSchemas[i].setProps(props);
       }
+
+      measurements[i] = measurementSchemas[i].getMeasurementId();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 32ed159e1a..e175434baf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -404,6 +404,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
     int measurementSize = stream.readInt();
 
+    this.measurements = new String[measurementSize];
     this.measurementSchemas = new MeasurementSchema[measurementSize];
     deserializeMeasurementSchema(stream);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 15ed741d5d..df46ab029a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -739,6 +739,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
 
     int measurementSize = stream.readInt();
+    this.measurements = new String[measurementSize];
     this.measurementSchemas = new MeasurementSchema[measurementSize];
     deserializeMeasurementSchema(stream);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index efd5cbf14a..1932f35704 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -154,18 +154,13 @@ public class TsFilePlanRedoer {
         return;
       }
     }
-    // TODO(getMeasurementSchema)
-    //    plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
-    //    try {
-    //      if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-    //        idTable.getSeriesSchemas(plan);
-    //      } else {
-    //        IoTDB.schemaProcessor.getSeriesSchemasAndReadLockDevice(plan);
-    //        plan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(plan.getDevicePath()));
-    //      }
-    //    } catch (IOException | MetadataException e) {
-    //      throw new QueryProcessException("can't replay insert logs, ", e);
-    //    }
+
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+      // TODO get device id by idTable
+      // idTable.getSeriesSchemas(node);
+    } else {
+      node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
+    }
 
     if (node instanceof InsertRowNode) {
       if (node.isAligned()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 314af7074b..2fd1ae18b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Map;
 
 /** Like {@link org.apache.iotdb.tsfile.utils.ReadWriteIOUtils} */
 public class WALWriteUtils {
@@ -134,13 +137,40 @@ public class WALWriteUtils {
     return write(n, buffer);
   }
 
+  /** TSEncoding. */
   public static int write(TSEncoding encoding, IWALByteBufferView buffer) {
     byte n = encoding.serialize();
     return write(n, buffer);
   }
 
+  /** CompressionType. */
   public static int write(CompressionType compressionType, IWALByteBufferView buffer) {
     byte n = compressionType.serialize();
     return write(n, buffer);
   }
+
+  /** MeasurementSchema. */
+  public static int write(MeasurementSchema measurementSchema, IWALByteBufferView buffer) {
+    int len = 0;
+
+    len += write(measurementSchema.getMeasurementId(), buffer);
+
+    len += write(measurementSchema.getType(), buffer);
+
+    len += write(measurementSchema.getEncodingType(), buffer);
+
+    len += write(measurementSchema.getCompressor(), buffer);
+
+    Map<String, String> props = measurementSchema.getProps();
+    if (props == null) {
+      len += write(0, buffer);
+    } else {
+      len += write(props.size(), buffer);
+      for (Map.Entry<String, String> entry : props.entrySet()) {
+        len += write(entry.getKey(), buffer);
+        len += write(entry.getValue(), buffer);
+      }
+    }
+    return len;
+  }
 }