You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/14 09:01:35 UTC
[iotdb] 03/03: [IOTDB-1228] Refactor PlanExecutor.insertTablet
method to support aligned timeseries
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f39bc420b766758e842dbead0bf1584142f2793c
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Mar 14 16:57:54 2021 +0800
[IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries
---
.../org/apache/iotdb/db/metadata/MManager.java | 123 ++++++++++++++++-----
.../org/apache/iotdb/db/metadata/MetaUtils.java | 3 +-
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 51 +++++++++
.../write/schema/VectorMeasurementSchema.java | 5 +-
4 files changed, 150 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 2bca513..6ddafbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -92,7 +92,6 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -107,6 +106,34 @@ import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1928,9 +1955,10 @@ public class MManager {
Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
// 2. get schema of each measurement
- // if do not has measurement
+ // if do not have measurement
MeasurementMNode measurementMNode;
- TSDataType dataType;
+ int loc = 0;
+
for (int i = 0; i < measurementList.length; i++) {
try {
MNode child = getMNode(deviceMNode.left, measurementList[i]);
@@ -1943,27 +1971,70 @@ public class MManager {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
} else {
// child is null or child is type of MNode
- dataType = getTypeInLoc(plan, i);
- // create it, may concurrent created by multiple thread
- internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
- measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+ // get dataType of plan, only support InsertRowPlan and InsertTabletPlan
+ if (plan instanceof InsertRowPlan) {
+ TSDataType dataType = plan.getDataTypes()[i];
+ // create it, may concurrent created by multiple thread
+ internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
+ measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+ } else if (plan instanceof InsertTabletPlan) {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<String> measurements =
+ Arrays.asList(measurementList[i].replace("(", "").replace(")", "").split(","));
+ if (measurements.size() == 1) {
+ internalCreateTimeseries(
+ deviceId.concatNode(measurementList[i]), plan.getDataTypes()[loc]);
+ measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+ } else {
+ for (int j = 0; j < measurements.size(); j++) {
+ dataTypes.add(plan.getDataTypes()[loc]);
+ loc++;
+ }
+ internalAlignedCreateTimeseries(deviceId, measurements, dataTypes);
+ measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurements.get(0));
+ }
+ } else {
+ throw new MetadataException(
+ String.format(
+ "Only support insertRow and insertTablet, plan is [%s]",
+ plan.getOperatorType()));
+ }
}
}
// check type is match
+ boolean mismatch = false;
TSDataType insertDataType = null;
if (plan instanceof InsertRowPlan) {
if (!((InsertRowPlan) plan).isNeedInferType()) {
- // only when InsertRowPlan's values is object[], we should check type
- insertDataType = getTypeInLoc(plan, i);
+ // only when InsertRowPlan's values list is object[], we should check type
+ insertDataType = plan.getDataTypes()[i];
} else {
insertDataType = measurementMNode.getSchema().getType();
}
+ mismatch = measurementMNode.getSchema().getType() != insertDataType;
} else if (plan instanceof InsertTabletPlan) {
- insertDataType = getTypeInLoc(plan, i);
+ int measurementSize = measurementList[i].split(",").length;
+ loc -= measurementSize;
+ if (measurementSize == 1) {
+ insertDataType = measurementMNode.getSchema().getType();
+ mismatch = measurementMNode.getSchema().getType() != insertDataType;
+ } else {
+ for (int j = 0; j < measurementSize; j++) {
+ TSDataType dataTypeInNode =
+ measurementMNode.getSchema().getValueTSDataTypeList().get(j);
+ insertDataType = plan.getDataTypes()[loc];
+ if (dataTypeInNode != insertDataType) {
+ mismatch = true;
+ insertDataType = measurementMNode.getSchema().getType();
+ break;
+ }
+ loc++;
+ }
+ }
}
- if (measurementMNode.getSchema().getType() != insertDataType) {
+ if (mismatch) {
logger.warn(
"DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
measurementList[i],
@@ -1985,7 +2056,6 @@ public class MManager {
// set measurementName instead of alias
measurementList[i] = measurementMNode.getName();
-
} catch (MetadataException e) {
logger.warn(
"meet error when check {}.{}, message: {}",
@@ -2008,7 +2078,7 @@ public class MManager {
return deviceMNode.getChild(measurementName);
}
- /** create timeseries with ignore PathAlreadyExistException */
+ /** create timeseries ignoring PathAlreadyExistException */
private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
throws MetadataException {
createTimeseries(
@@ -2019,21 +2089,20 @@ public class MManager {
Collections.emptyMap());
}
- /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
- private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
- TSDataType dataType;
- if (plan instanceof InsertRowPlan) {
- InsertRowPlan tPlan = (InsertRowPlan) plan;
- dataType =
- TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
- } else if (plan instanceof InsertTabletPlan) {
- dataType = (plan).getDataTypes()[loc];
- } else {
- throw new MetadataException(
- String.format(
- "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+ /** create aligned timeseries ignoring PathAlreadyExistException */
+ private void internalAlignedCreateTimeseries(
+ PartialPath devicePath, List<String> measurements, List<TSDataType> dataTypes)
+ throws MetadataException {
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (TSDataType dataType : dataTypes) {
+ encodings.add(getDefaultEncoding(dataType));
}
- return dataType;
+ createAlignedTimeSeries(
+ devicePath,
+ measurements,
+ dataTypes,
+ encodings,
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
index a02f5be..fab9114 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
@@ -115,7 +115,8 @@ public class MetaUtils {
*/
public static List<String> getMeasurementsInPartialPath(PartialPath fullPath) {
if (fullPath.getMeasurement().contains("(") && fullPath.getMeasurement().contains(",")) {
- return (Arrays.asList(fullPath.getMeasurement().split("\\(")[1].split("\\)")[0].split(",")));
+ return (Arrays.asList(
+ fullPath.getMeasurement().replace("(", "").replace(")", "").split(",")));
} else {
return Arrays.asList(fullPath.getMeasurement());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 52b5b64..1246eea 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -106,4 +106,55 @@ public class InsertTabletPlanTest {
Assert.assertEquals(6, record.getFields().size());
}
}
+
+ @Test
+ public void testInsertTabletPlanWithAlignedTimeseries()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1"),
+ new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 0411be9..def8386 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -318,10 +318,7 @@ public class VectorMeasurementSchema
TSDataType.deserialize(types[i]).toString(),
",",
TSEncoding.deserialize(encodings[i]).toString());
- sc.addTail("]");
- if (i != measurements.length - 1) {
- sc.addTail(", ");
- }
+ sc.addTail("],");
}
sc.addTail(CompressionType.deserialize(compressor).toString());
return sc.toString();