You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/16 02:54:17 UTC

[iotdb] branch createMultiTimeseries created (now 979c2a6)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch createMultiTimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 979c2a6  [IOTDB-1235] Refactor createMultiTimeseries

This branch includes the following new commits:

     new 979c2a6  [IOTDB-1235] Refactor createMultiTimeseries

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-1235] Refactor createMultiTimeseries

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch createMultiTimeseries
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 979c2a61523ce46062105a500d89aa46d58d1c22
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Mar 16 10:53:36 2021 +0800

    [IOTDB-1235] Refactor createMultiTimeseries
---
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 59 +++++++++++++++++-----
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  7 ++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 10 ++--
 .../db/qp/physical/PhysicalPlanSerializeTest.java  | 26 ++++++++++
 .../java/org/apache/iotdb/session/Session.java     |  4 +-
 6 files changed, 85 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 42bb0ac..389071f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -1339,7 +1339,7 @@ public class MTree implements Serializable {
         tsRow[2] = schema.getValueTSDataTypeList().get(i).toString();
         tsRow[3] = schema.getValueTSEncodingList().get(i).toString();
         tsRow[4] = schema.getCompressor().toString();
-        tsRow[5] = "0";
+        tsRow[5] = "-1";
         tsRow[6] =
             needLast
                 ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext))
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b97343c..75a2369 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.MetaUtils;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -1376,24 +1378,53 @@ public class PlanExecutor implements IPlanExecutor {
 
   private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan)
       throws BatchProcessException {
+    int dataTypeIdx = 0;
     for (int i = 0; i < multiPlan.getPaths().size(); i++) {
       if (multiPlan.getResults().containsKey(i)) {
         continue;
       }
-      CreateTimeSeriesPlan plan =
-          new CreateTimeSeriesPlan(
-              multiPlan.getPaths().get(i),
-              multiPlan.getDataTypes().get(i),
-              multiPlan.getEncodings().get(i),
-              multiPlan.getCompressors().get(i),
-              multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
-              multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
-              multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
-              multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
-      try {
-        createTimeSeries(plan);
-      } catch (QueryProcessException e) {
-        multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      PartialPath path = multiPlan.getPaths().get(i);
+      String measurement = path.getMeasurement();
+      if (measurement.contains("(") && measurement.contains(",")) {
+        PartialPath devicePath = path.getDevicePath();
+        List<String> measurements = MetaUtils.getMeasurementsInPartialPath(path);
+        List<TSDataType> dataTypes = new ArrayList<>();
+        List<TSEncoding> encodings = new ArrayList<>();
+        for (int j = 0; j < measurements.size(); j++) {
+          dataTypes.add(multiPlan.getDataTypes().get(dataTypeIdx));
+          encodings.add(multiPlan.getEncodings().get(dataTypeIdx));
+          dataTypeIdx++;
+        }
+        CreateAlignedTimeSeriesPlan plan =
+            new CreateAlignedTimeSeriesPlan(
+                devicePath,
+                measurements,
+                dataTypes,
+                encodings,
+                multiPlan.getCompressors().get(i),
+                Collections.emptyList());
+        try {
+          createAlignedTimeSeries(plan);
+        } catch (QueryProcessException e) {
+          multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        }
+      } else {
+        CreateTimeSeriesPlan plan =
+            new CreateTimeSeriesPlan(
+                multiPlan.getPaths().get(i),
+                multiPlan.getDataTypes().get(i),
+                multiPlan.getEncodings().get(i),
+                multiPlan.getCompressors().get(i),
+                multiPlan.getProps() == null ? null : multiPlan.getProps().get(i),
+                multiPlan.getTags() == null ? null : multiPlan.getTags().get(i),
+                multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i),
+                multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i));
+        dataTypeIdx++;
+        try {
+          createTimeSeries(plan);
+        } catch (QueryProcessException e) {
+          multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        }
       }
     }
     if (!multiPlan.getResults().isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 9ebc54b..ad7284f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -152,6 +152,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
     stream.write(type);
     stream.writeInt(paths.size());
+    stream.writeInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
 
     for (PartialPath path : paths) {
       putString(stream, path.getFullPath());
@@ -209,6 +210,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
     int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
     buffer.put((byte) type);
     buffer.putInt(paths.size());
+    buffer.putInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries
 
     for (PartialPath path : paths) {
       putString(buffer, path.getFullPath());
@@ -264,16 +266,17 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     int totalSize = buffer.getInt();
+    int dataTypeSize = buffer.getInt();
     paths = new ArrayList<>(totalSize);
     for (int i = 0; i < totalSize; i++) {
       paths.add(new PartialPath(readString(buffer)));
     }
     dataTypes = new ArrayList<>(totalSize);
-    for (int i = 0; i < totalSize; i++) {
+    for (int i = 0; i < dataTypeSize; i++) {
       dataTypes.add(TSDataType.values()[buffer.get()]);
     }
     encodings = new ArrayList<>(totalSize);
-    for (int i = 0; i < totalSize; i++) {
+    for (int i = 0; i < dataTypeSize; i++) {
       encodings.add(TSEncoding.values()[buffer.get()]);
     }
     compressors = new ArrayList<>(totalSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 81a95e7..02f0621 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1655,8 +1655,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan();
       List<PartialPath> paths = new ArrayList<>(req.paths.size());
-      List<TSDataType> dataTypes = new ArrayList<>(req.paths.size());
-      List<TSEncoding> encodings = new ArrayList<>(req.paths.size());
+      List<TSDataType> dataTypes = new ArrayList<>(req.dataTypes.size());
+      List<TSEncoding> encodings = new ArrayList<>(req.dataTypes.size());
       List<CompressionType> compressors = new ArrayList<>(req.paths.size());
       List<String> alias = null;
       if (req.measurementAliasList != null) {
@@ -1687,8 +1687,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         }
 
         paths.add(new PartialPath(req.paths.get(i)));
-        dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
-        encodings.add(TSEncoding.values()[req.encodings.get(i)]);
         compressors.add(CompressionType.values()[req.compressors.get(i)]);
         if (alias != null) {
           alias.add(req.measurementAliasList.get(i));
@@ -1703,6 +1701,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           attributes.add(req.attributesList.get(i));
         }
       }
+      for (int i = 0; i < req.dataTypes.size(); i++) {
+        dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]);
+        encodings.add(TSEncoding.values()[req.encodings.get(i)]);
+      }
 
       multiPlan.setPaths(paths);
       multiPlan.setDataTypes(dataTypes);
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
index 9a3eee3..e4f570e 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java
@@ -255,6 +255,32 @@ public class PhysicalPlanSerializeTest {
   }
 
   @Test
+  public void createMuSerializeTest3() throws IOException, IllegalPathException {
+    // same as:
+    // create timeseries root.sg.d1.s0 with datatype=DOUBLE, encoding=GORILLA, compression=SNAPPY
+    // create aligned timeseries root.sg.d1.(s1 INT64, s2 DOUBLE, s3 INT64)
+    // with encoding=(GORILLA, GORILLA, GORILLA), compression=SNAPPY
+    CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
+    plan.setPaths(
+        Arrays.asList(new PartialPath("root.sg.d1.s0"), new PartialPath("root.sg.d1.(s1,s2,s3)")));
+    plan.setDataTypes(
+        Arrays.asList(TSDataType.DOUBLE, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.INT64));
+    plan.setEncodings(
+        Arrays.asList(
+            TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA));
+    plan.setCompressors(Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY));
+    plan.setProps(null);
+    plan.setTags(null);
+    plan.setAttributes(null);
+    plan.setAlias(null);
+
+    PhysicalPlan result = testTwoSerializeMethodAndDeserialize(plan);
+
+    Assert.assertEquals(OperatorType.CREATE_MULTI_TIMESERIES, result.getOperatorType());
+    Assert.assertEquals(plan, result);
+  }
+
+  @Test
   public void AlterTimeSeriesPlanSerializeTest() throws IOException, IllegalPathException {
     AlterTimeSeriesPlan alterTimeSeriesPlan =
         new AlterTimeSeriesPlan(
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9167d0a..b829cd0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -438,13 +438,13 @@ public class Session {
 
     request.setPaths(paths);
 
-    List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
+    List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
     for (TSDataType dataType : dataTypes) {
       dataTypeOrdinals.add(dataType.ordinal());
     }
     request.setDataTypes(dataTypeOrdinals);
 
-    List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
+    List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
     for (TSEncoding encoding : encodings) {
       encodingOrdinals.add(encoding.ordinal());
     }