You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/16 02:54:18 UTC
[iotdb] 01/01: [IOTDB-1235] Refactor createMultiTimeseries
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch createMultiTimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 979c2a61523ce46062105a500d89aa46d58d1c22
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Mar 16 10:53:36 2021 +0800
[IOTDB-1235] Refactor createMultiTimeseries
---
.../java/org/apache/iotdb/db/metadata/MTree.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 59 +++++++++++++++++-----
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 7 ++-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 10 ++--
.../db/qp/physical/PhysicalPlanSerializeTest.java | 26 ++++++++++
.../java/org/apache/iotdb/session/Session.java | 4 +-
6 files changed, 85 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 42bb0ac..389071f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -1339,7 +1339,7 @@ public class MTree implements Serializable {
tsRow[2] = schema.getValueTSDataTypeList().get(i).toString();
tsRow[3] = schema.getValueTSEncodingList().get(i).toString();
tsRow[4] = schema.getCompressor().toString();
- tsRow[5] = "0";
+ tsRow[5] = "-1";
tsRow[6] =
needLast
? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext))
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b97343c..75a2369 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -1376,24 +1378,53 @@ public class PlanExecutor implements IPlanExecutor {
private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
throws BatchProcessException {
+ int dataTypeIdx = 0;
for (int i = 0; i < multiPlan.getPaths().size(); i++) {
if (multiPlan.getResults().containsKey(i)) {
continue;
}
- CreateTimeSeriesPlan plan =
- new CreateTimeSeriesPlan(
- multiPlan.getPaths().get(i),
- multiPlan.getDataTypes().get(i),
- multiPlan.getEncodings().get(i),
- multiPlan.getCompressors().get(i),
- multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
- multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
- multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
- multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
- try {
- createTimeSeries(plan);
- } catch (QueryProcessException e) {
- multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ PartialPath path = multiPlan.getPaths().get(i);
+ String measurement = path.getMeasurement();
+ if (measurement.contains("(") && measurement.contains(",")) {
+ PartialPath devicePath = path.getDevicePath();
+ List<String> measurements = MetaUtils.getMeasurementsInPartialPath(path);
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int j = 0; j < measurements.size(); j++) {
+ dataTypes.add(multiPlan.getDataTypes().get(dataTypeIdx));
+ encodings.add(multiPlan.getEncodings().get(dataTypeIdx));
+ dataTypeIdx++;
+ }
+ CreateAlignedTimeSeriesPlan plan =
+ new CreateAlignedTimeSeriesPlan(
+ devicePath,
+ measurements,
+ dataTypes,
+ encodings,
+ multiPlan.getCompressors().get(i),
+ Collections.emptyList());
+ try {
+ createAlignedTimeSeries(plan);
+ } catch (QueryProcessException e) {
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ } else {
+ CreateTimeSeriesPlan plan =
+ new CreateTimeSeriesPlan(
+ multiPlan.getPaths().get(i),
+ multiPlan.getDataTypes().get(i),
+ multiPlan.getEncodings().get(i),
+ multiPlan.getCompressors().get(i),
+ multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+ multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+ multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+ multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+ dataTypeIdx++;
+ try {
+ createTimeSeries(plan);
+ } catch (QueryProcessException e) {
+ multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
}
}
if (!multiPlan.getResults().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 9ebc54b..ad7284f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -152,6 +152,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
stream.write(type);
stream.writeInt(paths.size());
+ stream.writeInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
for (PartialPath path : paths) {
putString(stream, path.getFullPath());
@@ -209,6 +210,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
buffer.put((byte) type);
buffer.putInt(paths.size());
+ buffer.putInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
for (PartialPath path : paths) {
putString(buffer, path.getFullPath());
@@ -264,16 +266,17 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int totalSize = buffer.getInt();
+ int dataTypeSize = buffer.getInt();
paths = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; i++) {
paths.add(new PartialPath(readString(buffer)));
}
dataTypes = new ArrayList<>(totalSize);
- for (int i = 0; i < totalSize; i++) {
+ for (int i = 0; i < dataTypeSize; i++) {
dataTypes.add(TSDataType.values()[buffer.get()]);
}
encodings = new ArrayList<>(totalSize);
- for (int i = 0; i < totalSize; i++) {
+ for (int i = 0; i < dataTypeSize; i++) {
encodings.add(TSEncoding.values()[buffer.get()]);
}
compressors = new ArrayList<>(totalSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 81a95e7..02f0621 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1655,8 +1655,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
List<PartialPath> paths = new ArrayList<>(req.paths.size());
- List<TSDataType> dataTypes = new ArrayList<>(req.paths.size());
- List<TSEncoding> encodings = new ArrayList<>(req.paths.size());
+ List<TSDataType> dataTypes = new ArrayList<>(req.dataTypes.size());
+ List<TSEncoding> encodings = new ArrayList<>(req.dataTypes.size());
List<CompressionType> compressors = new ArrayList<>(req.paths.size());
List<String> alias = null;
if (req.measurementAliasList != null) {
@@ -1687,8 +1687,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
paths.add(new PartialPath(req.paths.get(i)));
- dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
- encodings.add(TSEncoding.values()[req.encodings.get(i)]);
compressors.add(CompressionType.values()[req.compressors.get(i)]);
if (alias != null) {
alias.add(req.measurementAliasList.get(i));
@@ -1703,6 +1701,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
attributes.add(req.attributesList.get(i));
}
}
+ for (int i = 0; i < req.dataTypes.size(); i++) {
+ dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
+ encodings.add(TSEncoding.values()[req.encodings.get(i)]);
+ }
multiPlan.setPaths(paths);
multiPlan.setDataTypes(dataTypes);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
index 9a3eee3..e4f570e 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
@@ -255,6 +255,32 @@ public class PhysicalPlanSerializeTest {
}
@Test
+ public void createMuSerializeTest3() throws IOException, IllegalPathException {
+ // same as:
+ // create timeseries root.sg.d1.s0 with datatype=DOUBLE, encoding=GORILLA, compression=SNAPPY
+ // create aligned timeseries root.sg.d1.(s1 INT64, s2 DOUBLE, s3 INT64)
+ // with encoding=(GORILLA, GORILLA, GORILLA), compression=SNAPPY
+ CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
+ plan.setPaths(
+ Arrays.asList(new PartialPath("root.sg.d1.s0"), new PartialPath("root.sg.d1.(s1,s2,s3)")));
+ plan.setDataTypes(
+ Arrays.asList(TSDataType.DOUBLE, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.INT64));
+ plan.setEncodings(
+ Arrays.asList(
+ TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA));
+ plan.setCompressors(Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY));
+ plan.setProps(null);
+ plan.setTags(null);
+ plan.setAttributes(null);
+ plan.setAlias(null);
+
+ PhysicalPlan result = testTwoSerializeMethodAndDeserialize(plan);
+
+ Assert.assertEquals(OperatorType.CREATE_MULTI_TIMESERIES, result.getOperatorType());
+ Assert.assertEquals(plan, result);
+ }
+
+ @Test
public void AlterTimeSeriesPlanSerializeTest() throws IOException, IllegalPathException {
AlterTimeSeriesPlan alterTimeSeriesPlan =
new AlterTimeSeriesPlan(
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9167d0a..b829cd0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -438,13 +438,13 @@ public class Session {
request.setPaths(paths);
- List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
+ List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
- List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
+ List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}