You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/22 13:22:53 UTC
[iotdb] branch master updated: [IOTDB-2982] Recover tsfile after datanode restart (#5643)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 39619ff619 [IOTDB-2982] Recover tsfile after datanode restart (#5643)
39619ff619 is described below
commit 39619ff619cf5829d97c7dc8fac58793b8191e3d
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Apr 22 21:22:47 2022 +0800
[IOTDB-2982] Recover tsfile after datanode restart (#5643)
---
.../sql/planner/plan/node/write/InsertNode.java | 27 +++++--------------
.../sql/planner/plan/node/write/InsertRowNode.java | 1 +
.../planner/plan/node/write/InsertTabletNode.java | 1 +
.../db/wal/recover/file/TsFilePlanRedoer.java | 19 +++++---------
.../apache/iotdb/db/wal/utils/WALWriteUtils.java | 30 ++++++++++++++++++++++
5 files changed, 45 insertions(+), 33 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 75de32d2c5..32520a924b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -140,32 +140,15 @@ public abstract class InsertNode extends WritePlanNode {
this.deviceID = deviceID;
}
- public void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
+ protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
for (MeasurementSchema measurementSchema : measurementSchemas) {
if (measurementSchema != null) {
- WALWriteUtils.write(measurementSchema.getMeasurementId(), buffer);
-
- WALWriteUtils.write(measurementSchema.getType(), buffer);
-
- WALWriteUtils.write(measurementSchema.getEncodingType(), buffer);
-
- WALWriteUtils.write(measurementSchema.getCompressor(), buffer);
-
- Map<String, String> props = measurementSchema.getProps();
- if (props == null) {
- WALWriteUtils.write(0, buffer);
- } else {
- WALWriteUtils.write(props.size(), buffer);
- for (Map.Entry<String, String> entry : props.entrySet()) {
- WALWriteUtils.write(entry.getKey(), buffer);
- WALWriteUtils.write(entry.getValue(), buffer);
- }
- }
+ WALWriteUtils.write(measurementSchema, buffer);
}
}
}
- public int serializeMeasurementSchemaSize() {
+ protected int serializeMeasurementSchemaSize() {
int byteLen = 0;
for (MeasurementSchema measurementSchema : measurementSchemas) {
if (measurementSchema != null) {
@@ -187,7 +170,7 @@ public abstract class InsertNode extends WritePlanNode {
}
/** Make sure the measurement schema is already inited before calling this */
- public void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
+ protected void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
for (int i = 0; i < measurementSchemas.length; i++) {
measurementSchemas[i] =
@@ -209,6 +192,8 @@ public abstract class InsertNode extends WritePlanNode {
}
measurementSchemas[i].setProps(props);
}
+
+ measurements[i] = measurementSchemas[i].getMeasurementId();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 32ed159e1a..e175434baf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -404,6 +404,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
int measurementSize = stream.readInt();
+ this.measurements = new String[measurementSize];
this.measurementSchemas = new MeasurementSchema[measurementSize];
deserializeMeasurementSchema(stream);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 15ed741d5d..df46ab029a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -739,6 +739,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
int measurementSize = stream.readInt();
+ this.measurements = new String[measurementSize];
this.measurementSchemas = new MeasurementSchema[measurementSize];
deserializeMeasurementSchema(stream);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index efd5cbf14a..1932f35704 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -154,18 +154,13 @@ public class TsFilePlanRedoer {
return;
}
}
- // TODO(getMeasurementSchema)
- // plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
- // try {
- // if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
- // idTable.getSeriesSchemas(plan);
- // } else {
- // IoTDB.schemaProcessor.getSeriesSchemasAndReadLockDevice(plan);
- // plan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(plan.getDevicePath()));
- // }
- // } catch (IOException | MetadataException e) {
- // throw new QueryProcessException("can't replay insert logs, ", e);
- // }
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ // TODO get device id by idTable
+ // idTable.getSeriesSchemas(node);
+ } else {
+ node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
+ }
if (node instanceof InsertRowNode) {
if (node.isAligned()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 314af7074b..2fd1ae18b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Map;
/** Like {@link org.apache.iotdb.tsfile.utils.ReadWriteIOUtils} */
public class WALWriteUtils {
@@ -134,13 +137,40 @@ public class WALWriteUtils {
return write(n, buffer);
}
+ /** TSEncoding. */
public static int write(TSEncoding encoding, IWALByteBufferView buffer) {
byte n = encoding.serialize();
return write(n, buffer);
}
+ /** CompressionType. */
public static int write(CompressionType compressionType, IWALByteBufferView buffer) {
byte n = compressionType.serialize();
return write(n, buffer);
}
+
+ /** MeasurementSchema. */
+ public static int write(MeasurementSchema measurementSchema, IWALByteBufferView buffer) {
+ int len = 0;
+
+ len += write(measurementSchema.getMeasurementId(), buffer);
+
+ len += write(measurementSchema.getType(), buffer);
+
+ len += write(measurementSchema.getEncodingType(), buffer);
+
+ len += write(measurementSchema.getCompressor(), buffer);
+
+ Map<String, String> props = measurementSchema.getProps();
+ if (props == null) {
+ len += write(0, buffer);
+ } else {
+ len += write(props.size(), buffer);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ len += write(entry.getKey(), buffer);
+ len += write(entry.getValue(), buffer);
+ }
+ }
+ return len;
+ }
}