You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/16 02:54:18 UTC

[iotdb] 01/01: [IOTDB-1235] Refactor createMultiTimeseries

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch createMultiTimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 979c2a61523ce46062105a500d89aa46d58d1c22
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Mar 16 10:53:36 2021 +0800

    [IOTDB-1235] Refactor createMultiTimeseries
---
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 59 +++++++++++++++++-----
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  7 ++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 10 ++--
 .../db/qp/physical/PhysicalPlanSerializeTest.java  | 26 ++++++++++
 .../java/org/apache/iotdb/session/Session.java     |  4 +-
 6 files changed, 85 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 42bb0ac..389071f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -1339,7 +1339,7 @@ public class MTree implements Serializable {
         tsRow[2] = schema.getValueTSDataTypeList().get(i).toString();
         tsRow[3] = schema.getValueTSEncodingList().get(i).toString();
         tsRow[4] = schema.getCompressor().toString();
-        tsRow[5] = "0";
+        tsRow[5] = "-1";
         tsRow[6] =
             needLast
                 ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext))
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b97343c..75a2369 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MetaUtils;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -1376,24 +1378,53 @@ public class PlanExecutor implements IPlanExecutor {
 
   private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
       throws BatchProcessException {
+    int dataTypeIdx = 0;
     for (int i = 0; i < multiPlan.getPaths().size(); i++) {
       if (multiPlan.getResults().containsKey(i)) {
         continue;
       }
-      CreateTimeSeriesPlan plan =
-          new CreateTimeSeriesPlan(
-              multiPlan.getPaths().get(i),
-              multiPlan.getDataTypes().get(i),
-              multiPlan.getEncodings().get(i),
-              multiPlan.getCompressors().get(i),
-              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
-              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
-              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
-              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
-      try {
-        createTimeSeries(plan);
-      } catch (QueryProcessException e) {
-        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      PartialPath path = multiPlan.getPaths().get(i);
+      String measurement = path.getMeasurement();
+      if (measurement.contains("(") && measurement.contains(",")) {
+        PartialPath devicePath = path.getDevicePath();
+        List<String> measurements = MetaUtils.getMeasurementsInPartialPath(path);
+        List<TSDataType> dataTypes = new ArrayList<>();
+        List<TSEncoding> encodings = new ArrayList<>();
+        for (int j = 0; j < measurements.size(); j++) {
+          dataTypes.add(multiPlan.getDataTypes().get(dataTypeIdx));
+          encodings.add(multiPlan.getEncodings().get(dataTypeIdx));
+          dataTypeIdx++;
+        }
+        CreateAlignedTimeSeriesPlan plan =
+            new CreateAlignedTimeSeriesPlan(
+                devicePath,
+                measurements,
+                dataTypes,
+                encodings,
+                multiPlan.getCompressors().get(i),
+                Collections.emptyList());
+        try {
+          createAlignedTimeSeries(plan);
+        } catch (QueryProcessException e) {
+          multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        }
+      } else {
+        CreateTimeSeriesPlan plan =
+            new CreateTimeSeriesPlan(
+                multiPlan.getPaths().get(i),
+                multiPlan.getDataTypes().get(i),
+                multiPlan.getEncodings().get(i),
+                multiPlan.getCompressors().get(i),
+                multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+                multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+                multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+                multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+        dataTypeIdx++;
+        try {
+          createTimeSeries(plan);
+        } catch (QueryProcessException e) {
+          multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        }
       }
     }
     if (!multiPlan.getResults().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 9ebc54b..ad7284f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -152,6 +152,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
     stream.write(type);
     stream.writeInt(paths.size());
+    stream.writeInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
 
     for (PartialPath path : paths) {
       putString(stream, path.getFullPath());
@@ -209,6 +210,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
     buffer.put((byte) type);
     buffer.putInt(paths.size());
+    buffer.putInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
 
     for (PartialPath path : paths) {
       putString(buffer, path.getFullPath());
@@ -264,16 +266,17 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int totalSize = buffer.getInt();
+    int dataTypeSize = buffer.getInt();
     paths = new ArrayList<>(totalSize);
     for (int i = 0; i < totalSize; i++) {
       paths.add(new PartialPath(readString(buffer)));
     }
     dataTypes = new ArrayList<>(totalSize);
-    for (int i = 0; i < totalSize; i++) {
+    for (int i = 0; i < dataTypeSize; i++) {
       dataTypes.add(TSDataType.values()[buffer.get()]);
     }
     encodings = new ArrayList<>(totalSize);
-    for (int i = 0; i < totalSize; i++) {
+    for (int i = 0; i < dataTypeSize; i++) {
       encodings.add(TSEncoding.values()[buffer.get()]);
     }
     compressors = new ArrayList<>(totalSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 81a95e7..02f0621 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1655,8 +1655,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
       List<PartialPath> paths = new ArrayList<>(req.paths.size());
-      List<TSDataType> dataTypes = new ArrayList<>(req.paths.size());
-      List<TSEncoding> encodings = new ArrayList<>(req.paths.size());
+      List<TSDataType> dataTypes = new ArrayList<>(req.dataTypes.size());
+      List<TSEncoding> encodings = new ArrayList<>(req.dataTypes.size());
       List<CompressionType> compressors = new ArrayList<>(req.paths.size());
       List<String> alias = null;
       if (req.measurementAliasList != null) {
@@ -1687,8 +1687,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         }
 
         paths.add(new PartialPath(req.paths.get(i)));
-        dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
-        encodings.add(TSEncoding.values()[req.encodings.get(i)]);
         compressors.add(CompressionType.values()[req.compressors.get(i)]);
         if (alias != null) {
           alias.add(req.measurementAliasList.get(i));
@@ -1703,6 +1701,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           attributes.add(req.attributesList.get(i));
         }
       }
+      for (int i = 0; i < req.dataTypes.size(); i++) {
+        dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
+        encodings.add(TSEncoding.values()[req.encodings.get(i)]);
+      }
 
       multiPlan.setPaths(paths);
       multiPlan.setDataTypes(dataTypes);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
index 9a3eee3..e4f570e 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
@@ -255,6 +255,32 @@ public class PhysicalPlanSerializeTest {
   }
 
   @Test
+  public void createMuSerializeTest3() throws IOException, IllegalPathException {
+    // same as:
+    // create timeseries root.sg.d1.s0 with datatype=DOUBLE, encoding=GORILLA, compression=SNAPPY
+    // create aligned timeseries root.sg.d1.(s1 INT64, s2 DOUBLE, s3 INT64)
+    // with encoding=(GORILLA, GORILLA, GORILLA), compression=SNAPPY
+    CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
+    plan.setPaths(
+        Arrays.asList(new PartialPath("root.sg.d1.s0"), new PartialPath("root.sg.d1.(s1,s2,s3)")));
+    plan.setDataTypes(
+        Arrays.asList(TSDataType.DOUBLE, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.INT64));
+    plan.setEncodings(
+        Arrays.asList(
+            TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA));
+    plan.setCompressors(Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY));
+    plan.setProps(null);
+    plan.setTags(null);
+    plan.setAttributes(null);
+    plan.setAlias(null);
+
+    PhysicalPlan result = testTwoSerializeMethodAndDeserialize(plan);
+
+    Assert.assertEquals(OperatorType.CREATE_MULTI_TIMESERIES, result.getOperatorType());
+    Assert.assertEquals(plan, result);
+  }
+
+  @Test
   public void AlterTimeSeriesPlanSerializeTest() throws IOException, IllegalPathException {
     AlterTimeSeriesPlan alterTimeSeriesPlan =
         new AlterTimeSeriesPlan(
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9167d0a..b829cd0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -438,13 +438,13 @@ public class Session {
 
     request.setPaths(paths);
 
-    List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
+    List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
     for (TSDataType dataType : dataTypes) {
       dataTypeOrdinals.add(dataType.ordinal());
     }
     request.setDataTypes(dataTypeOrdinals);
 
-    List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
+    List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
     for (TSEncoding encoding : encodings) {
       encodingOrdinals.add(encoding.ordinal());
     }