You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/04 14:42:18 UTC
[iotdb] branch master updated: [IOTDB-3034] Partial insert in new cluster (#5763)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83590e9f08 [IOTDB-3034] Partial insert in new cluster (#5763)
83590e9f08 is described below
commit 83590e9f08b8f517fd185b17bfa5ad484db523f1
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 4 22:42:11 2022 +0800
[IOTDB-3034] Partial insert in new cluster (#5763)
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 28 +-
.../iotdb/db/engine/storagegroup/DataRegion.java | 14 +
.../iotdb/db/mpp/plan/analyze/SchemaValidator.java | 2 +-
.../plan/node/write/InsertMultiTabletsNode.java | 11 +-
.../plan/planner/plan/node/write/InsertNode.java | 176 +++++---
.../planner/plan/node/write/InsertRowNode.java | 378 ++++++++--------
.../planner/plan/node/write/InsertRowsNode.java | 11 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 11 +-
.../planner/plan/node/write/InsertTabletNode.java | 482 +++++++++++----------
.../service/thrift/impl/InternalServiceImpl.java | 4 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 6 +-
.../apache/iotdb/db/wal/utils/WALWriteUtils.java | 17 +
.../write/InsertMultiTabletsNodeSerdeTest.java | 10 +-
.../plan/node/write/InsertRowNodeSerdeTest.java | 17 +-
.../plan/node/write/InsertRowsNodeSerdeTest.java | 2 +-
.../write/InsertRowsOfOneDeviceNodeSerdeTest.java | 2 +-
.../plan/node/write/InsertTabletNodeSerdeTest.java | 16 +-
.../org/apache/iotdb/db/wal/io/WALFileTest.java | 114 +++++
.../java/org/apache/iotdb/tsfile/utils/BitMap.java | 23 +
19 files changed, 752 insertions(+), 572 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index ddd98b3987..cf045e3fb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -221,10 +221,17 @@ public abstract class AbstractMemTable implements IMemTable {
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
+ int nullPointsNumber = 0;
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
+ // use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
continue;
}
+ // use values[i] to ignore null value
+ if (values[i] == null) {
+ nullPointsNumber++;
+ continue;
+ }
IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
schemaList.add(schema);
dataTypes.add(schema.getType());
@@ -232,7 +239,10 @@ public abstract class AbstractMemTable implements IMemTable {
memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
- int pointsInserted = insertRowNode.getMeasurements().length;
+ int pointsInserted =
+ insertRowNode.getMeasurements().length
+ - insertRowNode.getFailedMeasurementNumber()
+ - nullPointsNumber;
totalPointsNum += pointsInserted;
@@ -295,16 +305,17 @@ public abstract class AbstractMemTable implements IMemTable {
public void insertAlignedRow(InsertRowNode insertRowNode) {
// if this insert node isn't from storage engine, we should set a temp device id for it
if (insertRowNode.getDeviceID() == null) {
- insertRowNode.setDeviceID(
- DeviceIDFactory.getInstance().getDeviceID(insertRowNode.getDevicePath()));
+ insertRowNode.setDeviceID(deviceIDFactory.getDeviceID(insertRowNode.getDevicePath()));
}
- // updatePlanIndexes(insertRowNode.getIndex());
+ // TODO updatePlanIndexes(insertRowNode.getIndex());
updatePlanIndexes(0);
String[] measurements = insertRowNode.getMeasurements();
+ Object[] values = insertRowNode.getValues();
List<IMeasurementSchema> schemaList = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
+ // use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
continue;
}
@@ -315,13 +326,8 @@ public abstract class AbstractMemTable implements IMemTable {
if (schemaList.isEmpty()) {
return;
}
- memSize +=
- MemUtils.getAlignedRecordsSize(dataTypes, insertRowNode.getValues(), disableMemControl);
- writeAlignedRow(
- insertRowNode.getDeviceID(),
- schemaList,
- insertRowNode.getTime(),
- insertRowNode.getValues());
+ memSize += MemUtils.getAlignedRecordsSize(dataTypes, values, disableMemControl);
+ writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
int pointsInserted = insertRowNode.getMeasurements().length;
totalPointsNum += pointsInserted;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index ce0644859e..7112596005 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -866,6 +866,13 @@ public class DataRegion {
} finally {
writeUnlock();
}
+
+ if (insertRowNode.hasFailedMeasurements()) {
+ logger.warn(
+ "Fail to insert measurements {} caused by {}",
+ insertRowNode.getFailedMeasurements(),
+ insertRowNode.getFailedMessages());
+ }
}
/**
@@ -1081,6 +1088,13 @@ public class DataRegion {
} finally {
writeUnlock();
}
+
+ if (insertTabletNode.hasFailedMeasurements()) {
+ logger.warn(
+ "Fail to insert measurements {} caused by {}",
+ insertTabletNode.getFailedMeasurements(),
+ insertTabletNode.getFailedMessages());
+ }
}
/** @return whether the given time falls in ttl */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
index 748257c328..c0f5b5f412 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
@@ -48,7 +48,7 @@ public class SchemaValidator {
insertNode.isAligned());
}
- if (!insertNode.validateSchema(schemaTree)) {
+ if (!insertNode.validateAndSetSchema(schemaTree)) {
throw new SemanticException("Data type mismatch");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 1f9e324358..13cf231923 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -117,9 +117,9 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
}
@Override
- public boolean validateSchema(SchemaTree schemaTree) {
+ public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
- if (!insertTabletNode.validateSchema(schemaTree)) {
+ if (!insertTabletNode.validateAndSetSchema(schemaTree)) {
return false;
}
}
@@ -179,13 +179,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
return null;
}
- @Override
- public void setMeasurementSchemas(SchemaTree schemaTree) {
- for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
- insertTabletNode.setMeasurementSchemas(schemaTree);
- }
- }
-
@Override
public List<PartialPath> getDevicePaths() {
List<PartialPath> partialPaths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 45b54aa979..48252bbab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,27 +20,27 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
public abstract class InsertNode extends WritePlanNode {
@@ -57,6 +57,9 @@ public abstract class InsertNode extends WritePlanNode {
// TODO(INSERT) need to change it to a function handle to update last time value
// protected IMeasurementMNode[] measurementMNodes;
+ /** index of failed measurements -> info including measurement, data type and value */
+ protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+
/**
* device id reference, for reuse device id in both id table and memtable <br>
* used in memtable
@@ -135,55 +138,37 @@ public abstract class InsertNode extends WritePlanNode {
this.deviceID = deviceID;
}
- protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- WALWriteUtils.write(measurementSchema, buffer);
- }
- }
-
- protected int serializeMeasurementSchemaSize() {
+ /** Serialized size of measurement schemas, ignoring failed time series */
+ protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
- byteLen += 3 * Byte.BYTES;
- Map<String, String> props = measurementSchema.getProps();
- if (props == null) {
- byteLen += Integer.BYTES;
- } else {
- byteLen += Integer.BYTES;
- for (Map.Entry<String, String> entry : props.entrySet()) {
- byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
- byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
- }
+ for (int i = 0; i < measurements.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
}
+ byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]);
}
return byteLen;
}
- /** Make sure the measurement schema is already inited before calling this */
- protected void deserializeMeasurementSchema(DataInputStream stream) throws IOException {
- for (int i = 0; i < measurementSchemas.length; i++) {
-
- measurementSchemas[i] =
- new MeasurementSchema(
- ReadWriteIOUtils.readString(stream),
- TSDataType.deserialize(ReadWriteIOUtils.readByte(stream)),
- TSEncoding.deserialize(ReadWriteIOUtils.readByte(stream)),
- CompressionType.deserialize(ReadWriteIOUtils.readByte(stream)));
-
- int size = ReadWriteIOUtils.readInt(stream);
- if (size > 0) {
- Map<String, String> props = new HashMap<>();
- String key;
- String value;
- for (int j = 0; j < size; j++) {
- key = ReadWriteIOUtils.readString(stream);
- value = ReadWriteIOUtils.readString(stream);
- props.put(key, value);
- }
- measurementSchemas[i].setProps(props);
+ /** Serialize measurement schemas, ignoring failed time series */
+ protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
+ for (int i = 0; i < measurements.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
}
+ WALWriteUtils.write(measurementSchemas[i], buffer);
+ }
+ }
+ /**
+ * Deserialize measurement schemas. Make sure the measurement schemas and measurements have been
+ * created before calling this
+ */
+ protected void deserializeMeasurementSchemas(DataInputStream stream) throws IOException {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(stream);
measurements[i] = measurementSchemas[i].getMeasurementId();
}
}
@@ -192,33 +177,96 @@ public abstract class InsertNode extends WritePlanNode {
return dataRegionReplicaSet;
}
- public abstract boolean validateSchema(SchemaTree schemaTree);
+ public abstract boolean validateAndSetSchema(SchemaTree schemaTree);
- public void setMeasurementSchemas(SchemaTree schemaTree) {
- DeviceSchemaInfo deviceSchemaInfo =
- schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
- measurementSchemas =
- deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
+ /** Check whether data types are matched with measurement schemas */
+ protected boolean selfCheckDataTypes() {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ if (dataTypes[i] != measurementSchemas[i].getType()) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ return false;
+ } else {
+ markFailedMeasurement(
+ i,
+ new DataTypeMismatchException(
+ devicePath.getFullPath(),
+ measurements[i],
+ measurementSchemas[i].getType(),
+ dataTypes[i]));
+ }
+ }
+ }
+ return true;
}
+ // region partial insert
/**
- * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
- * measurements, the failed values or columns would be null as well. We'd better use
- * "measurements[index] == null" to determine if the measurement failed.
+ * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
+ * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
+ * <br>
+ * This method is not concurrency-safe.
*
* @param index failed measurement index
+ * @param cause cause Exception of failure
*/
- public void markFailedMeasurementInsertion(int index, Exception e) {
- // todo partial insert
- if (measurements[index] == null) {
- return;
+ public void markFailedMeasurement(int index, Exception cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean hasFailedMeasurements() {
+ return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+ }
+
+ public int getFailedMeasurementNumber() {
+ return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
+ }
+
+ public List<String> getFailedMeasurements() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(info -> info.measurement)
+ .collect(Collectors.toList());
+ }
+
+ public List<Exception> getFailedExceptions() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(info -> info.cause)
+ .collect(Collectors.toList());
+ }
+
+ public List<String> getFailedMessages() {
+ return failedMeasurementIndex2Info == null
+ ? Collections.emptyList()
+ : failedMeasurementIndex2Info.values().stream()
+ .map(
+ info -> {
+ Throwable cause = info.cause;
+ while (cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ return cause.getMessage();
+ })
+ .collect(Collectors.toList());
+ }
+
+ protected static class FailedMeasurementInfo {
+ protected String measurement;
+ protected TSDataType dataType;
+ protected Object value;
+ protected Exception cause;
+
+ public FailedMeasurementInfo(
+ String measurement, TSDataType dataType, Object value, Exception cause) {
+ this.measurement = measurement;
+ this.dataType = dataType;
+ this.value = value;
+ this.cause = cause;
}
- // if (failedMeasurements == null) {
- // failedMeasurements = new ArrayList<>();
- // }
- // failedMeasurements.add(measurements[index]);
- measurements[index] = null;
}
+ // endregion
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index f63ee885e4..61a8081909 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
@@ -54,6 +53,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Objects;
@@ -165,159 +165,90 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
@Override
- public boolean validateSchema(SchemaTree schemaTree) {
+ public boolean validateAndSetSchema(SchemaTree schemaTree) {
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
-
- if (isNeedInferType) {
- try {
- transferType(measurementSchemas);
- } catch (QueryProcessException e) {
- return false;
- }
- } else {
- // todo partial insert
- if (deviceSchemaInfo.isAligned() != isAligned) {
- return false;
- }
-
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (dataTypes[i] != measurementSchemas.get(i).getType()) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- return false;
- } else {
- markFailedMeasurementInsertion(
- i,
- new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[i],
- measurementSchemas.get(i).getType(),
- dataTypes[i]));
- }
- }
- }
+ if (deviceSchemaInfo.isAligned() != isAligned) {
+ return false;
}
+ this.measurementSchemas =
+ deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
- // filter failed measurements
- measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
- dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
- values = Arrays.stream(values).filter(Objects::nonNull).toArray(Object[]::new);
-
- return true;
- }
-
- @Override
- public void markFailedMeasurementInsertion(int index, Exception e) {
- if (measurements[index] == null) {
- return;
+ // transfer data types from string values when necessary
+ try {
+ transferType();
+ } catch (QueryProcessException e) {
+ return false;
}
- super.markFailedMeasurementInsertion(index, e);
- values[index] = null;
- dataTypes[index] = null;
+
+ // validate whether data types are matched
+ return selfCheckDataTypes();
}
/**
- * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
- * Double, Binary)
+ * transfer String[] values to specific data types when isNeedInferType is true. <br>
+ * Notice: measurementSchemas must be initialized before calling this method
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public void transferType(List<MeasurementSchema> measurementSchemas)
- throws QueryProcessException {
- if (isNeedInferType) {
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (measurementSchemas.get(i) == null) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(
- i,
- new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath()
- + IoTDBConstant.PATH_SEPARATOR
- + measurements[i])));
- } else {
- throw new QueryProcessException(
- new PathNotExistException(
- devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
- }
- continue;
- }
+ private void transferType() throws QueryProcessException {
+ if (!isNeedInferType) {
+ return;
+ }
- dataTypes[i] = measurementSchemas.get(i).getType();
- try {
- values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
- } catch (Exception e) {
- logger.warn(
- "{}.{} data type is not consistent, input {}, registered {}",
- devicePath,
- measurements[i],
- values[i],
- dataTypes[i]);
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- markFailedMeasurementInsertion(i, e);
- } else {
- throw e;
- }
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ // null when time series doesn't exist
+ if (measurementSchemas[i] == null) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+ } else {
+ markFailedMeasurement(
+ i,
+ new QueryProcessException(
+ new PathNotExistException(
+ devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
+ }
+ continue;
+ }
+ // parse string value to specific type
+ dataTypes[i] = measurementSchemas[i].getType();
+ try {
+ values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+ } catch (Exception e) {
+ logger.warn(
+ "{}.{} data type is not consistent, input {}, registered {}",
+ devicePath,
+ measurements[i],
+ values[i],
+ dataTypes[i]);
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw e;
+ } else {
+ markFailedMeasurement(i, e);
}
}
- isNeedInferType = false;
}
+ isNeedInferType = false;
}
@Override
- public int serializedSize() {
- int size = 0;
- size += Short.BYTES;
- return size + subSerializeSize();
- }
-
- int subSerializeSize() {
- int size = 0;
- size += Long.BYTES;
- size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
- return size + serializeMeasurementsAndValuesSize();
- }
-
- int serializeMeasurementsAndValuesSize() {
- int size = 0;
- size += Integer.BYTES;
-
- size += serializeMeasurementSchemaSize();
+ public void markFailedMeasurement(int index, Exception cause) {
+ if (measurements[index] == null) {
+ return;
+ }
- // putValues
- for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] != null) {
- if (values[i] == null) {
- size += Byte.BYTES;
- continue;
- }
- size += Byte.BYTES;
- switch (dataTypes[i]) {
- case BOOLEAN:
- size += Byte.BYTES;
- break;
- case INT32:
- size += Integer.BYTES;
- break;
- case INT64:
- size += Long.BYTES;
- break;
- case FLOAT:
- size += Float.BYTES;
- break;
- case DOUBLE:
- size += Double.BYTES;
- break;
- case TEXT:
- size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
- break;
- }
- }
+ if (failedMeasurementIndex2Info == null) {
+ failedMeasurementIndex2Info = new HashMap<>();
}
- size += Byte.BYTES;
- return size;
+ FailedMeasurementInfo failedMeasurementInfo =
+ new FailedMeasurementInfo(measurements[index], dataTypes[index], values[index], cause);
+ failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+ measurements[index] = null;
+ dataTypes[index] = null;
+ values[index] = null;
}
@Override
@@ -332,40 +263,47 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
serializeMeasurementsAndValues(buffer);
}
+ /** Serialize measurements and values, ignoring failed time series */
void serializeMeasurementsAndValues(ByteBuffer buffer) {
- buffer.putInt(measurements.length);
+ buffer.putInt(measurements.length - getFailedMeasurementNumber());
+ serializeMeasurementsOrSchemas(buffer);
+ putDataTypesAndValues(buffer);
+ buffer.put((byte) (isNeedInferType ? 1 : 0));
+ buffer.put((byte) (isAligned ? 1 : 0));
+ }
- // check whether has measurement schemas or not
+ /** Serialize measurements or measurement schemas, ignoring failed time series */
+ private void serializeMeasurementsOrSchemas(ByteBuffer buffer) {
buffer.put((byte) (measurementSchemas != null ? 1 : 0));
- if (measurementSchemas != null) {
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- measurementSchema.serializeTo(buffer);
+ for (int i = 0; i < measurements.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
}
- } else {
- for (String measurement : measurements) {
- ReadWriteIOUtils.write(measurement, buffer);
+ // serialize measurement schemas when exist
+ if (measurementSchemas != null) {
+ measurementSchemas[i].serializeTo(buffer);
+ } else {
+ ReadWriteIOUtils.write(measurements[i], buffer);
}
}
-
- try {
- putValues(buffer);
- } catch (QueryProcessException e) {
- logger.error("Failed to serialize values for {}", this, e);
- }
-
- buffer.put((byte) (isNeedInferType ? 1 : 0));
- buffer.put((byte) (isAligned ? 1 : 0));
}
- private void putValues(ByteBuffer buffer) throws QueryProcessException {
+ /** Serialize data types and values, ignoring failed time series */
+ private void putDataTypesAndValues(ByteBuffer buffer) {
for (int i = 0; i < values.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+ // serialize null value
if (values[i] == null) {
ReadWriteIOUtils.write(TYPE_NULL, buffer);
continue;
}
// types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
- if (dataTypes == null || dataTypes[i] == null) {
+ if (isNeedInferType) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
ReadWriteIOUtils.write(values[i].toString(), buffer);
} else {
@@ -390,7 +328,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
ReadWriteIOUtils.write((Binary) values[i], buffer);
break;
default:
- throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
}
}
}
@@ -404,7 +342,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return insertNode;
}
- public void subDeserialize(ByteBuffer byteBuffer) {
+ void subDeserialize(ByteBuffer byteBuffer) {
time = byteBuffer.getLong();
try {
devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
@@ -420,7 +358,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
this.measurements = new String[measurementSize];
boolean hasSchema = buffer.get() == 1;
if (hasSchema) {
- this.measurementSchemas = new MeasurementSchema[measurementSize];
+ measurementSchemas = new MeasurementSchema[measurementSize];
for (int i = 0; i < measurementSize; i++) {
measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
measurements[i] = measurementSchemas[i].getMeasurementId();
@@ -431,20 +369,16 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
}
- this.dataTypes = new TSDataType[measurementSize];
- this.values = new Object[measurementSize];
- try {
- fillValues(buffer);
- } catch (QueryProcessException e) {
- e.printStackTrace();
- }
+ dataTypes = new TSDataType[measurementSize];
+ values = new Object[measurementSize];
+ fillDataTypesAndValues(buffer);
isNeedInferType = buffer.get() == 1;
isAligned = buffer.get() == 1;
}
- /** Make sure the values is already inited before calling this */
- public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+ /** Make sure the dataTypes and values have been created before calling this */
+ private void fillDataTypesAndValues(ByteBuffer buffer) {
for (int i = 0; i < dataTypes.length; i++) {
// types are not determined, the situation mainly occurs when the node uses string values
// and is forwarded to other nodes
@@ -474,40 +408,97 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
values[i] = ReadWriteIOUtils.readBinary(buffer);
break;
default:
- throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
}
}
}
+ // region serialize & deserialize methods for WAL
+ /** Serialized size for wal */
+ @Override
+ public int serializedSize() {
+ return Short.BYTES + subSerializeSize();
+ }
+
+ private int subSerializeSize() {
+ int size = 0;
+ size += Long.BYTES;
+ size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+ return size + serializeMeasurementsAndValuesSize();
+ }
+
+ private int serializeMeasurementsAndValuesSize() {
+ int size = 0;
+ size += Integer.BYTES;
+
+ size += serializeMeasurementSchemasSize();
+
+ // putValues
+ for (int i = 0; i < values.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+ // serialize null value
+ if (values[i] == null) {
+ size += Byte.BYTES;
+ continue;
+ }
+ size += Byte.BYTES;
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ size += Byte.BYTES;
+ break;
+ case INT32:
+ size += Integer.BYTES;
+ break;
+ case INT64:
+ size += Long.BYTES;
+ break;
+ case FLOAT:
+ size += Float.BYTES;
+ break;
+ case DOUBLE:
+ size += Double.BYTES;
+ break;
+ case TEXT:
+ size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
+ break;
+ }
+ }
+
+ size += Byte.BYTES;
+ return size;
+ }
+
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
subSerialize(buffer);
}
- void subSerialize(IWALByteBufferView buffer) {
+ private void subSerialize(IWALByteBufferView buffer) {
buffer.putLong(time);
WALWriteUtils.write(devicePath.getFullPath(), buffer);
serializeMeasurementsAndValues(buffer);
}
- void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
- buffer.putInt(measurementSchemas.length);
-
- serializeMeasurementSchemaToWAL(buffer);
-
- try {
- putValues(buffer);
- } catch (QueryProcessException e) {
- logger.error("Failed to serialize values for {}", this, e);
- }
-
+ /** Serialize measurements and values, ignoring failed time series */
+ private void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+ buffer.putInt(measurementSchemas.length - getFailedMeasurementNumber());
+ serializeMeasurementSchemasToWAL(buffer);
+ putDataTypesAndValues(buffer);
buffer.put((byte) (isAligned ? 1 : 0));
}
- private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
- // todo remove serialize datatype after serializing measurement schema
+ /** Serialize data types and values, ignoring failed time series */
+ private void putDataTypesAndValues(IWALByteBufferView buffer) {
for (int i = 0; i < values.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+ // serialize null value
if (values[i] == null) {
WALWriteUtils.write(TYPE_NULL, buffer);
continue;
@@ -533,14 +524,14 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
WALWriteUtils.write((Binary) values[i], buffer);
break;
default:
- throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
}
}
}
+ /** Deserialize from wal */
public static InsertRowNode deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
- // This method is used for deserialize from wal
// we do not store plan node id in wal entry
InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
insertNode.setTime(stream.readLong());
@@ -553,23 +544,19 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
void deserializeMeasurementsAndValues(DataInputStream stream) throws IOException {
int measurementSize = stream.readInt();
- this.measurements = new String[measurementSize];
- this.measurementSchemas = new MeasurementSchema[measurementSize];
- deserializeMeasurementSchema(stream);
+ measurements = new String[measurementSize];
+ measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchemas(stream);
- this.dataTypes = new TSDataType[measurementSize];
- this.values = new Object[measurementSize];
- try {
- fillValues(stream);
- } catch (QueryProcessException e) {
- e.printStackTrace();
- }
+ dataTypes = new TSDataType[measurementSize];
+ values = new Object[measurementSize];
+ fillDataTypesAndValues(stream);
isAligned = stream.readByte() == 1;
}
- /** Make sure the values is already inited before calling this */
- public void fillValues(DataInputStream stream) throws QueryProcessException, IOException {
+ /** Make sure the dataTypes and values have been created before calling this */
+ public void fillDataTypesAndValues(DataInputStream stream) throws IOException {
for (int i = 0; i < dataTypes.length; i++) {
byte typeNum = stream.readByte();
if (typeNum == TYPE_NULL) {
@@ -596,10 +583,11 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
values[i] = ReadWriteIOUtils.readBinary(stream);
break;
default:
- throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ throw new RuntimeException("Unsupported data type:" + dataTypes[i]);
}
}
}
+ // endregion
@Override
public boolean equals(Object o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index f9dd6c9359..5ca6141a8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -103,22 +103,15 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
public void addChild(PlanNode child) {}
@Override
- public boolean validateSchema(SchemaTree schemaTree) {
+ public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertRowNode insertRowNode : insertRowNodeList) {
- if (!insertRowNode.validateSchema(schemaTree)) {
+ if (!insertRowNode.validateAndSetSchema(schemaTree)) {
return false;
}
}
return true;
}
- @Override
- public void setMeasurementSchemas(SchemaTree schemaTree) {
- for (InsertRowNode insertRowNode : insertRowNodeList) {
- insertRowNode.setMeasurementSchemas(schemaTree);
- }
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 24432e6064..eaa8d2f2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -138,22 +138,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
}
@Override
- public boolean validateSchema(SchemaTree schemaTree) {
+ public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertRowNode insertRowNode : insertRowNodeList) {
- if (!insertRowNode.validateSchema(schemaTree)) {
+ if (!insertRowNode.validateAndSetSchema(schemaTree)) {
return false;
}
}
return true;
}
- @Override
- public void setMeasurementSchemas(SchemaTree schemaTree) {
- for (InsertRowNode insertRowNode : insertRowNodeList) {
- insertRowNode.setMeasurementSchemas(schemaTree);
- }
- }
-
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
List<WritePlanNode> result = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index b7133a5ec7..24b560cc9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -164,38 +162,17 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
@Override
- public boolean validateSchema(SchemaTree schemaTree) {
+ public boolean validateAndSetSchema(SchemaTree schemaTree) {
DeviceSchemaInfo deviceSchemaInfo =
schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
-
- // todo partial insert
if (deviceSchemaInfo.isAligned() != isAligned) {
return false;
}
+ measurementSchemas =
+ deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
- List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
- for (int i = 0; i < measurementSchemas.size(); i++) {
- if (dataTypes[i] != measurementSchemas.get(i).getType()) {
- if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
- return false;
- } else {
- markFailedMeasurementInsertion(
- i,
- new DataTypeMismatchException(
- devicePath.getFullPath(),
- measurements[i],
- measurementSchemas.get(i).getType(),
- dataTypes[i]));
- }
- }
- }
-
- // filter failed measurements
- measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
- dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
- columns = Arrays.stream(columns).filter(Objects::nonNull).toArray(Object[]::new);
-
- return true;
+ // validate whether data types are matched
+ return selfCheckDataTypes();
}
@Override
@@ -316,91 +293,58 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return result;
}
- @Override
- public void markFailedMeasurementInsertion(int index, Exception e) {
- if (measurements[index] == null) {
- return;
+ private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
+ Object[] values = new Object[columnSize];
+ for (int i = 0; i < values.length; i++) {
+ switch (dataTypes[i]) {
+ case TEXT:
+ values[i] = new Binary[rowSize];
+ break;
+ case FLOAT:
+ values[i] = new float[rowSize];
+ break;
+ case INT32:
+ values[i] = new int[rowSize];
+ break;
+ case INT64:
+ values[i] = new long[rowSize];
+ break;
+ case DOUBLE:
+ values[i] = new double[rowSize];
+ break;
+ case BOOLEAN:
+ values[i] = new boolean[rowSize];
+ break;
+ }
}
- super.markFailedMeasurementInsertion(index, e);
- dataTypes[index] = null;
- columns[index] = null;
- bitMaps[index] = null;
- }
-
- @Override
- public int serializedSize() {
- return serializedSize(0, rowCount);
+ return values;
}
- public int serializedSize(int start, int end) {
- int size = 0;
- size += Short.BYTES;
- return size + subSerializeSize(start, end);
+ private BitMap[] initBitmaps(int columnSize, int rowSize) {
+ BitMap[] bitMaps = new BitMap[columnSize];
+ for (int i = 0; i < columnSize; i++) {
+ bitMaps[i] = new BitMap(rowSize);
+ }
+ return bitMaps;
}
- int subSerializeSize(int start, int end) {
- int size = 0;
- size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
- // measurements size
- size += Integer.BYTES;
-
- size += serializeMeasurementSchemaSize();
-
- size += Byte.BYTES * dataTypes.length;
-
- // times size
- size += Integer.BYTES;
- size += Long.BYTES * (end - start);
- // bitmaps size
- size += Byte.BYTES;
- if (bitMaps != null) {
- for (BitMap bitMap : bitMaps) {
- size += Byte.BYTES;
- if (bitMap != null) {
- int len = end - start;
- BitMap partBitMap = new BitMap(len);
- BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
- size += partBitMap.getByteArray().length;
- }
- }
+ @Override
+ public void markFailedMeasurement(int index, Exception cause) {
+ if (measurements[index] == null) {
+ return;
}
- // values size
- for (int i = 0; i < dataTypes.length; i++) {
- if (columns[i] != null) {
- size += getColumnSize(dataTypes[i], columns[i], start, end);
- }
+
+ if (failedMeasurementIndex2Info == null) {
+ failedMeasurementIndex2Info = new HashMap<>();
}
- size += Byte.BYTES;
- return size;
- }
+ FailedMeasurementInfo failedMeasurementInfo =
+ new FailedMeasurementInfo(measurements[index], dataTypes[index], columns[index], cause);
+ failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
- private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
- int size = 0;
- switch (dataType) {
- case INT32:
- size += Integer.BYTES * (end - start);
- break;
- case INT64:
- size += Long.BYTES * (end - start);
- break;
- case FLOAT:
- size += Float.BYTES * (end - start);
- break;
- case DOUBLE:
- size += Double.BYTES * (end - start);
- break;
- case BOOLEAN:
- size += Byte.BYTES * (end - start);
- break;
- case TEXT:
- Binary[] binaryValues = (Binary[]) column;
- for (int j = start; j < end; j++) {
- size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
- }
- break;
- }
- return size;
+ measurements[index] = null;
+ dataTypes[index] = null;
+ columns[index] = null;
}
@Override
@@ -411,7 +355,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
void subSerialize(ByteBuffer buffer) {
ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
- writeMeasurements(buffer);
+ writeMeasurementsOrSchemas(buffer);
writeDataTypes(buffer);
writeTimes(buffer);
writeBitMaps(buffer);
@@ -419,26 +363,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
buffer.put((byte) (isAligned ? 1 : 0));
}
- private void writeMeasurements(ByteBuffer buffer) {
- buffer.putInt(measurements.length);
+ /** Serialize measurements or measurement schemas, ignoring failed time series */
+ private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
+ buffer.putInt(measurements.length - getFailedMeasurementNumber());
- // check whether has measurement schemas or not
buffer.put((byte) (measurementSchemas != null ? 1 : 0));
-
- if (measurementSchemas != null) {
- for (MeasurementSchema measurement : measurementSchemas) {
- measurement.serializeTo(buffer);
+ for (int i = 0; i < measurements.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
}
- } else {
- for (String measurement : measurements) {
- ReadWriteIOUtils.write(measurement, buffer);
+ // serialize measurement schemas when exist
+ if (measurementSchemas != null) {
+ measurementSchemas[i].serializeTo(buffer);
+ } else {
+ ReadWriteIOUtils.write(measurements[i], buffer);
}
}
}
+ /** Serialize data types, ignoring failed time series */
private void writeDataTypes(ByteBuffer buffer) {
- for (TSDataType dataType : dataTypes) {
- dataType.serializeTo(buffer);
+ for (int i = 0; i < dataTypes.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+ dataTypes[i].serializeTo(buffer);
}
}
@@ -449,23 +400,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
}
+ /** Serialize bitmaps, ignoring failed time series */
private void writeBitMaps(ByteBuffer buffer) {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
- for (int i = 0; i < measurements.length; i++) {
- BitMap bitMap = bitMaps[i];
- if (bitMap == null) {
+ for (int i = 0; i < bitMaps.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+
+ if (bitMaps[i] == null) {
buffer.put(BytesUtils.boolToByte(false));
} else {
buffer.put(BytesUtils.boolToByte(true));
- buffer.put(bitMap.getByteArray());
+ buffer.put(bitMaps[i].getByteArray());
}
}
}
}
+ /** Serialize values, ignoring failed time series */
private void writeValues(ByteBuffer buffer) {
- for (int i = 0; i < dataTypes.length; i++) {
+ for (int i = 0; i < columns.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
serializeColumn(dataTypes[i], columns[i], buffer);
}
}
@@ -514,6 +475,132 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
}
+ public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
+ InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
+ insertNode.subDeserialize(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
+ }
+
+ public void subDeserialize(ByteBuffer buffer) {
+ try {
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+ } catch (IllegalPathException e) {
+ throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+ }
+
+ int measurementSize = buffer.getInt();
+ measurements = new String[measurementSize];
+
+ boolean hasSchema = buffer.get() == 1;
+ if (hasSchema) {
+ this.measurementSchemas = new MeasurementSchema[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+ measurements[i] = measurementSchemas[i].getMeasurementId();
+ }
+ } else {
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = ReadWriteIOUtils.readString(buffer);
+ }
+ }
+
+ dataTypes = new TSDataType[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ dataTypes[i] = TSDataType.deserialize(buffer.get());
+ }
+
+ rowCount = buffer.getInt();
+ times = new long[rowCount];
+ times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+
+ boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
+ if (hasBitMaps) {
+ bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount);
+ }
+ columns =
+ QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount);
+ isAligned = buffer.get() == 1;
+ }
+
+ // region serialize & deserialize methods for WAL
+ /** Serialized size for wal */
+ @Override
+ public int serializedSize() {
+ return serializedSize(0, rowCount);
+ }
+
+ /** Serialized size for wal */
+ public int serializedSize(int start, int end) {
+ return Short.BYTES + subSerializeSize(start, end);
+ }
+
+ int subSerializeSize(int start, int end) {
+ int size = 0;
+ size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
+ // measurements size
+ size += Integer.BYTES;
+ size += serializeMeasurementSchemasSize();
+ // times size
+ size += Integer.BYTES;
+ size += Long.BYTES * (end - start);
+ // bitmaps size
+ size += Byte.BYTES;
+ if (bitMaps != null) {
+ for (int i = 0; i < bitMaps.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+
+ size += Byte.BYTES;
+ if (bitMaps[i] != null) {
+ int len = end - start;
+ BitMap partBitMap = new BitMap(len);
+ BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
+ size += partBitMap.getByteArray().length;
+ }
+ }
+ }
+ // values size
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (columns[i] != null) {
+ size += getColumnSize(dataTypes[i], columns[i], start, end);
+ }
+ }
+
+ size += Byte.BYTES;
+ return size;
+ }
+
+ private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
+ int size = 0;
+ switch (dataType) {
+ case INT32:
+ size += Integer.BYTES * (end - start);
+ break;
+ case INT64:
+ size += Long.BYTES * (end - start);
+ break;
+ case FLOAT:
+ size += Float.BYTES * (end - start);
+ break;
+ case DOUBLE:
+ size += Double.BYTES * (end - start);
+ break;
+ case BOOLEAN:
+ size += Byte.BYTES * (end - start);
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = start; j < end; j++) {
+ size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
+ }
+ break;
+ }
+ return size;
+ }
+
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
serializeToWAL(buffer, 0, rowCount);
@@ -526,23 +613,18 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
void subSerialize(IWALByteBufferView buffer, int start, int end) {
WALWriteUtils.write(devicePath.getFullPath(), buffer);
- writeMeasurements(buffer);
- writeDataTypes(buffer);
+ // data types are serialized in measurement schemas
+ writeMeasurementSchemas(buffer);
writeTimes(buffer, start, end);
writeBitMaps(buffer, start, end);
writeValues(buffer, start, end);
buffer.put((byte) (isAligned ? 1 : 0));
}
- private void writeMeasurements(IWALByteBufferView buffer) {
- buffer.putInt(measurementSchemas.length);
- serializeMeasurementSchemaToWAL(buffer);
- }
-
- private void writeDataTypes(IWALByteBufferView buffer) {
- for (TSDataType dataType : dataTypes) {
- WALWriteUtils.write(dataType, buffer);
- }
+ /** Serialize measurement schemas, ignoring failed time series */
+ private void writeMeasurementSchemas(IWALByteBufferView buffer) {
+ buffer.putInt(measurements.length - getFailedMeasurementNumber());
+ serializeMeasurementSchemasToWAL(buffer);
}
private void writeTimes(IWALByteBufferView buffer, int start, int end) {
@@ -552,26 +634,36 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
}
+ /** Serialize bitmaps, ignoring failed time series */
private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
- for (int i = 0; i < measurements.length; i++) {
- BitMap bitMap = bitMaps[i];
- if (bitMap == null) {
+ for (int i = 0; i < bitMaps.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
+
+ if (bitMaps[i] == null) {
buffer.put(BytesUtils.boolToByte(false));
} else {
buffer.put(BytesUtils.boolToByte(true));
int len = end - start;
BitMap partBitMap = new BitMap(len);
- BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
buffer.put(partBitMap.getByteArray());
}
}
}
}
+ /** Serialize values, ignoring failed time series */
private void writeValues(IWALByteBufferView buffer, int start, int end) {
- for (int i = 0; i < dataTypes.length; i++) {
+ for (int i = 0; i < columns.length; i++) {
+ // ignore failed partial insert
+ if (measurements[i] == null) {
+ continue;
+ }
serializeColumn(dataTypes[i], columns[i], buffer, start, end);
}
}
@@ -621,94 +713,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
}
- private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
- Object[] values = new Object[columnSize];
- for (int i = 0; i < values.length; i++) {
- switch (dataTypes[i]) {
- case TEXT:
- values[i] = new Binary[rowSize];
- break;
- case FLOAT:
- values[i] = new float[rowSize];
- break;
- case INT32:
- values[i] = new int[rowSize];
- break;
- case INT64:
- values[i] = new long[rowSize];
- break;
- case DOUBLE:
- values[i] = new double[rowSize];
- break;
- case BOOLEAN:
- values[i] = new boolean[rowSize];
- break;
- }
- }
- return values;
- }
-
- private BitMap[] initBitmaps(int columnSize, int rowSize) {
- BitMap[] bitMaps = new BitMap[columnSize];
- for (int i = 0; i < columnSize; i++) {
- bitMaps[i] = new BitMap(rowSize);
- }
- return bitMaps;
- }
-
- public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
- InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
- insertNode.subDeserialize(byteBuffer);
- insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
- return insertNode;
- }
-
- public void subDeserialize(ByteBuffer buffer) {
- try {
- this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
- } catch (IllegalPathException e) {
- throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
- }
-
- int measurementSize = buffer.getInt();
- this.measurements = new String[measurementSize];
-
- boolean hasSchema = buffer.get() == 1;
-
- if (hasSchema) {
- this.measurementSchemas = new MeasurementSchema[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
- measurements[i] = measurementSchemas[i].getMeasurementId();
- }
- } else {
- for (int i = 0; i < measurementSize; i++) {
- measurements[i] = ReadWriteIOUtils.readString(buffer);
- }
- }
-
- this.dataTypes = new TSDataType[measurementSize];
- for (int i = 0; i < measurementSize; i++) {
- dataTypes[i] = TSDataType.deserialize(buffer.get());
- }
-
- int rows = buffer.getInt();
- rowCount = rows;
- this.times = new long[rows];
- times = QueryDataSetUtils.readTimesFromBuffer(buffer, rows);
-
- boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
- if (hasBitMaps) {
- bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rows);
- }
- columns =
- QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rows);
- this.isAligned = buffer.get() == 1;
- }
-
+ /** Deserialize from wal */
public static InsertTabletNode deserialize(DataInputStream stream)
throws IllegalPathException, IOException {
- // This method is used for deserialize from wal
// we do not store plan node id in wal entry
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
insertNode.subDeserialize(stream);
@@ -716,31 +723,32 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
}
private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
- this.devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
+ devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));
int measurementSize = stream.readInt();
- this.measurements = new String[measurementSize];
- this.measurementSchemas = new MeasurementSchema[measurementSize];
- deserializeMeasurementSchema(stream);
+ measurements = new String[measurementSize];
+ measurementSchemas = new MeasurementSchema[measurementSize];
+ deserializeMeasurementSchemas(stream);
- this.dataTypes = new TSDataType[measurementSize];
+ // data types are serialized in measurement schemas
+ dataTypes = new TSDataType[measurementSize];
for (int i = 0; i < measurementSize; i++) {
- dataTypes[i] = TSDataType.deserialize(stream.readByte());
+ dataTypes[i] = measurementSchemas[i].getType();
}
- int rows = stream.readInt();
- rowCount = rows;
- this.times = new long[rows];
- times = QueryDataSetUtils.readTimesFromStream(stream, rows);
+ rowCount = stream.readInt();
+ times = new long[rowCount];
+ times = QueryDataSetUtils.readTimesFromStream(stream, rowCount);
boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte());
if (hasBitMaps) {
- bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rows);
+ bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize, rowCount);
}
columns =
- QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
- this.isAligned = stream.readByte() == 1;
+ QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rowCount);
+ isAligned = stream.readByte() == 1;
}
+ // endregion
@Override
public int hashCode() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b3d145ca83..f8b5778611 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
@@ -106,8 +105,7 @@ public class InternalServiceImpl implements InternalService.Iface {
PlanNode planNode = fragmentInstance.getFragment().getRoot();
if (planNode instanceof InsertNode) {
try {
- SchemaTree schemaTree = SchemaValidator.validate((InsertNode) planNode);
- ((InsertNode) planNode).setMeasurementSchemas(schemaTree);
+ SchemaValidator.validate((InsertNode) planNode);
} catch (SemanticException e) {
response.setAccepted(false);
response.setMessage(e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 7551509bca..61be14f55e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -62,12 +62,14 @@ public class MemUtils {
*/
public static long getRecordsSize(
List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+ int emptyRecordCount = 0;
long memSize = 0L;
- for (int i = 0; i < dataTypes.size(); i++) {
+ for (int i = 0; i < value.length; i++) {
if (value[i] == null) {
+ emptyRecordCount++;
continue;
}
- memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize);
+ memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], addingTextDataSize);
}
return memSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
index 2fd1ae18b9..5eb016ced7 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
@@ -173,4 +173,21 @@ public class WALWriteUtils {
}
return len;
}
+
+ public static int sizeToWrite(MeasurementSchema measurementSchema) {
+ int byteLen = 0;
+ byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+ byteLen += 3 * Byte.BYTES;
+
+ Map<String, String> props = measurementSchema.getProps();
+ byteLen += Integer.BYTES;
+ if (props != null) {
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+ byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
+ }
+ }
+
+ return byteLen;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
index 0fe2113d57..97b00cb218 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
@@ -21,28 +21,22 @@ package org.apache.iotdb.db.mpp.plan.plan.node.write;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
import java.nio.ByteBuffer;
public class InsertMultiTabletsNodeSerdeTest {
@Test
- public void testInsertMultiTabletPlan()
- throws QueryProcessException, MetadataException, InterruptedException,
- QueryFilterOptimizationException, StorageEngineException, IOException {
+ public void testInsertMultiTabletPlan() throws MetadataException {
long[] times = new long[] {110L, 111L, 112L, 113L};
TSDataType[] dataTypes =
new TSDataType[] {
@@ -96,6 +90,6 @@ public class InsertMultiTabletsNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_MULTI_TABLET.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertMultiTabletsNode.deserialize(byteBuffer), insertMultiTabletsNode);
+ Assert.assertEquals(insertMultiTabletsNode, InsertMultiTabletsNode.deserialize(byteBuffer));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
index 0dd4ae51f3..12a8120013 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowNodeSerdeTest.java
@@ -48,7 +48,7 @@ public class InsertRowNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+ Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
insertRowNode = getInsertRowNodeWithMeasurementSchemas();
byteBuffer = ByteBuffer.allocate(10000);
@@ -57,7 +57,7 @@ public class InsertRowNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+ Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
insertRowNode = getInsertRowNodeWithStringValue();
byteBuffer = ByteBuffer.allocate(10000);
@@ -66,7 +66,7 @@ public class InsertRowNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+ Assert.assertEquals(insertRowNode, InsertRowNode.deserialize(byteBuffer));
}
@Test
@@ -75,25 +75,20 @@ public class InsertRowNodeSerdeTest {
int serializedSize = insertRowNode.serializedSize();
- Assert.assertEquals(serializedSize, 125);
-
byte[] bytes = new byte[serializedSize];
WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
insertRowNode.serializeToWAL(walBuffer);
+ Assert.assertFalse(walBuffer.getBuffer().hasRemaining());
DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
+ tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
- Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
- Assert.assertEquals(tmpNode.getDevicePath(), insertRowNode.getDevicePath());
- Assert.assertEquals(tmpNode.isAligned(), insertRowNode.isAligned());
- Assert.assertArrayEquals(tmpNode.getValues(), insertRowNode.getValues());
- Assert.assertArrayEquals(
- tmpNode.getMeasurementSchemas(), insertRowNode.getMeasurementSchemas());
+ Assert.assertEquals(insertRowNode, tmpNode);
}
private InsertRowNode getInsertRowNode() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
index 4c3744e2f2..1e3033367e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsNodeSerdeTest.java
@@ -67,6 +67,6 @@ public class InsertRowsNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROWS.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertRowsNode.deserialize(byteBuffer), node);
+ Assert.assertEquals(node, InsertRowsNode.deserialize(byteBuffer));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
index f657b7cc03..155f80c116 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -76,6 +76,6 @@ public class InsertRowsOfOneDeviceNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertRowsOfOneDeviceNode.deserialize(byteBuffer), node);
+ Assert.assertEquals(node, InsertRowsOfOneDeviceNode.deserialize(byteBuffer));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
index 0dfeae490e..b1c73bc080 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -48,7 +48,7 @@ public class InsertTabletNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+ Assert.assertEquals(insertTabletNode, InsertTabletNode.deserialize(byteBuffer));
insertTabletNode = getInsertTabletNodeWithSchema();
byteBuffer = ByteBuffer.allocate(10000);
@@ -57,7 +57,7 @@ public class InsertTabletNodeSerdeTest {
Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
- Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+ Assert.assertEquals(insertTabletNode, InsertTabletNode.deserialize(byteBuffer));
}
@Test
@@ -66,26 +66,20 @@ public class InsertTabletNodeSerdeTest {
int serializedSize = insertTabletNode.serializedSize();
- Assert.assertEquals(229, serializedSize);
-
byte[] bytes = new byte[serializedSize];
WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
insertTabletNode.serializeToWAL(walBuffer);
+ Assert.assertFalse(walBuffer.getBuffer().hasRemaining());
DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+ tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
- Assert.assertArrayEquals(tmpNode.getTimes(), insertTabletNode.getTimes());
- Assert.assertEquals(tmpNode.getDevicePath(), insertTabletNode.getDevicePath());
- Assert.assertEquals(tmpNode.isAligned(), insertTabletNode.isAligned());
- Assert.assertArrayEquals(tmpNode.getColumns(), insertTabletNode.getColumns());
- Assert.assertArrayEquals(tmpNode.getBitMaps(), insertTabletNode.getBitMaps());
- Assert.assertArrayEquals(
- tmpNode.getMeasurementSchemas(), insertTabletNode.getMeasurementSchemas());
+ Assert.assertEquals(insertTabletNode, tmpNode);
}
private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index 670b8a7dcb..d774b36635 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.wal.io;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
@@ -66,6 +70,8 @@ public class WALFileTest {
public void testReadNormalFile() throws IOException, IllegalPathException {
int fakeMemTableId = 1;
List<WALEntry> expectedWALEntries = new ArrayList<>();
+ expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+ expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
@@ -111,6 +117,8 @@ public class WALFileTest {
public void testReadBrokenFile() throws IOException, IllegalPathException {
int fakeMemTableId = 1;
List<WALEntry> expectedWALEntries = new ArrayList<>();
+ expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowNode(devicePath)));
+ expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletNode(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertRowPlan(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getInsertTabletPlan(devicePath)));
expectedWALEntries.add(new WALEntry(fakeMemTableId, getDeletePlan(devicePath)));
@@ -217,6 +225,112 @@ public class WALFileTest {
return insertTabletPlan;
}
+ public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0;
+ columns[1] = 2.0f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0);
+
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+
+ insertRowNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN),
+ new MeasurementSchema("s6", TSDataType.TEXT)
+ });
+ return insertRowNode;
+ }
+
+ public static InsertTabletNode getInsertTabletNode(String devicePath)
+ throws IllegalPathException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0 + r;
+ ((float[]) columns[1])[r] = 2 + r;
+ ((long[]) columns[2])[r] = 10000 + r;
+ ((int[]) columns[3])[r] = 100 + r;
+ ((boolean[]) columns[4])[r] = (r % 2 == 0);
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ BitMap[] bitMaps = new BitMap[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ bitMaps[i].mark(i % times.length);
+ }
+
+ InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ times,
+ bitMaps,
+ columns,
+ times.length);
+
+ insertTabletNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN),
+ new MeasurementSchema("s6", TSDataType.TEXT)
+ });
+
+ return insertTabletNode;
+ }
+
public static DeletePlan getDeletePlan(String devicePath) throws IllegalPathException {
return new DeletePlan(Long.MIN_VALUE, Long.MAX_VALUE, new PartialPath(devicePath));
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
index 97ed0b7397..747ea23323 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.tsfile.utils;
import java.util.Arrays;
+import java.util.Objects;
public class BitMap {
private static final byte[] BIT_UTIL = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
@@ -124,6 +125,28 @@ public class BitMap {
return res.toString();
}
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(size);
+ result = 31 * result + Arrays.hashCode(bits);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BitMap)) {
+ return false;
+ }
+ BitMap other = (BitMap) obj;
+ return this.size == other.size && Arrays.equals(this.bits, other.bits);
+ }
+
@Override
public BitMap clone() {
byte[] cloneBytes = new byte[this.bits.length];