You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/17 08:41:07 UTC

[iotdb] branch Vector updated: Add measurementId in VectorMeasurementSchema

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch Vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/Vector by this push:
     new 0b8d79d  Add measurementId in VectorMeasurementSchema
     new b79d5a7  Merge remote-tracking branch 'origin/Vector' into Vector
0b8d79d is described below

commit 0b8d79d964cce36c5649f438b0c61ce3236cfaf4
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Mar 17 16:25:40 2021 +0800

    Add measurementId in VectorMeasurementSchema
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 72 ++++++++++--------
 .../java/org/apache/iotdb/db/metadata/MTree.java   | 19 ++++-
 .../iotdb/db/metadata/template/Template.java       | 20 +++--
 .../db/engine/memtable/MemTableTestUtils.java      |  5 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |  5 +-
 .../iotdb/db/metadata/MManagerBasicTest.java       | 87 ++++++++++++++++++----
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |  3 +-
 .../write/schema/VectorMeasurementSchema.java      | 27 ++++++-
 8 files changed, 178 insertions(+), 60 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index e8e1d91..3d39b60 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,35 +18,8 @@
  */
 package org.apache.iotdb.db.metadata;
 
-import static java.util.stream.Collectors.toList;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -105,9 +78,39 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -783,6 +786,9 @@ public class MManager {
       return TSDataType.INT64;
     }
 
+    if (path instanceof VectorPartialPath) {
+      return TSDataType.VECTOR;
+    }
     IMeasurementSchema schema = mtree.getSchema(path);
     if (schema instanceof MeasurementSchema) {
       return schema.getType();
@@ -792,6 +798,10 @@ public class MManager {
     }
   }
 
+  public TSDataType getSeriesType(VectorPartialPath path) {
+    return TSDataType.VECTOR;
+  }
+
   public MeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements)
       throws MetadataException {
     MNode deviceNode = getNodeByPath(deviceId);
@@ -1119,7 +1129,11 @@ public class MManager {
         encodings[i] = schema.getValueTSEncodingList().get(index);
       }
       return new VectorMeasurementSchema(
-          measurementIndices, types, encodings, schema.getCompressor());
+          IoTDBConstant.ALIGN_TIMESERIES_PREFIX,
+          measurementIndices,
+          types,
+          encodings,
+          schema.getCompressor());
     }
 
     if (leaf != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 30225c8..d19c1dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -351,6 +351,7 @@ public class MTree implements Serializable {
               cur,
               leafName,
               new VectorMeasurementSchema(
+                  leafName,
                   measurements.toArray(new String[measurementsSize]),
                   dataTypes.toArray(new TSDataType[measurementsSize]),
                   encodings.toArray(new TSEncoding[measurementsSize]),
@@ -1240,8 +1241,15 @@ public class MTree implements Serializable {
         addMeasurementSchema(
             node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
       } else if (measurementSchema instanceof VectorMeasurementSchema) {
+        String lastWord = nodes[nodes.length - 1];
         addVectorMeasurementSchema(
-            node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
+            node,
+            timeseriesSchemaList,
+            needLast,
+            queryContext,
+            measurementSchema,
+            "*",
+            nodes.length == idx ? lastWord : null);
       }
       if (hasLimit) {
         count.set(count.get() + 1);
@@ -1316,7 +1324,8 @@ public class MTree implements Serializable {
                   needLast,
                   queryContext,
                   schema,
-                  nodeReg);
+                  nodeReg,
+                  null);
             }
           }
         }
@@ -1354,12 +1363,16 @@ public class MTree implements Serializable {
       boolean needLast,
       QueryContext queryContext,
       IMeasurementSchema schema,
-      String reg)
+      String reg,
+      String lastMeasurement)
       throws StorageGroupNotSetException, IllegalPathException {
     List<String> measurements = schema.getValueMeasurementIdList();
     int measurementSize = measurements.size();
     for (int i = 0; i < measurementSize; i++) {
       if (Pattern.matches(reg.replace("*", ".*"), measurements.get(i))) {
+        if (lastMeasurement != null && !lastMeasurement.contains(measurements.get(i))) {
+          continue;
+        }
         PartialPath devicePath = node.getPartialPath().getDevicePath();
         String[] tsRow = new String[7];
         tsRow[0] = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index c9f0f9d..19b9551 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -18,12 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.template;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
@@ -33,6 +28,13 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class Template {
   String name;
 
@@ -59,7 +61,11 @@ public class Template {
 
         curSchema =
             new VectorMeasurementSchema(
-                measurementsArray, typeArray, encodingArray, plan.getCompressors().get(i));
+                IoTDBConstant.ALIGN_TIMESERIES_PREFIX,
+                measurementsArray,
+                typeArray,
+                encodingArray,
+                plan.getCompressors().get(i));
       }
       // normal measurement
       else {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index d74c375..b0c84ed 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -89,7 +90,9 @@ public class MemTableTestUtils {
     encodings[1] = TSEncoding.GORILLA;
 
     MeasurementMNode[] mNodes = new MeasurementMNode[2];
-    IMeasurementSchema schema = new VectorMeasurementSchema(measurements, dataTypes, encodings);
+    IMeasurementSchema schema =
+        new VectorMeasurementSchema(
+            IoTDBConstant.ALIGN_TIMESERIES_PREFIX, measurements, dataTypes, encodings);
     mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
     mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 0f56da6..7332705 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -297,7 +298,9 @@ public class PrimitiveMemTableTest {
     String deviceId = "root.sg.device5";
 
     MeasurementMNode[] mNodes = new MeasurementMNode[2];
-    IMeasurementSchema schema = new VectorMeasurementSchema(measurements, dataTypes, encodings);
+    IMeasurementSchema schema =
+        new VectorMeasurementSchema(
+            IoTDBConstant.ALIGN_TIMESERIES_PREFIX, measurements, dataTypes, encodings);
     mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
     mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 4de11a9..be9f68a 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -18,20 +18,6 @@
  */
 package org.apache.iotdb.db.metadata;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -40,6 +26,9 @@ import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -47,11 +36,27 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class MManagerBasicTest {
 
   private CompressionType compressionType;
@@ -1069,4 +1074,58 @@ public class MManagerBasicTest {
           e.getMessage());
     }
   }
+
+  @Test
+  public void testShowTimeseries() {
+    MManager manager = IoTDB.metaManager;
+    try {
+      manager.createTimeseries(
+          new PartialPath("root.laptop.d1.s0"),
+          TSDataType.valueOf("INT32"),
+          TSEncoding.valueOf("RLE"),
+          compressionType,
+          Collections.emptyMap());
+      manager.createAlignedTimeSeries(
+          new PartialPath("root.laptop.d1"),
+          Arrays.asList("s1", "s2", "s3"),
+          Arrays.asList(
+              TSDataType.valueOf("INT32"),
+              TSDataType.valueOf("FLOAT"),
+              TSDataType.valueOf("INT32")),
+          Arrays.asList(
+              TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")),
+          compressionType);
+
+      // show timeseries root.laptop.d1.s0
+      ShowTimeSeriesPlan showTimeSeriesPlan =
+          new ShowTimeSeriesPlan(
+              new PartialPath("root.laptop.d1.s0"), false, null, null, 0, 0, false);
+      List<ShowTimeSeriesResult> result =
+          manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+      assertEquals(1, result.size());
+      assertEquals("root.laptop.d1.s0", result.get(0).getName());
+
+      // show timeseries root.laptop.d1.s1
+      showTimeSeriesPlan =
+          new ShowTimeSeriesPlan(
+              new PartialPath("root.laptop.d1.s1"), false, null, null, 0, 0, false);
+      result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+      assertEquals(1, result.size());
+      assertEquals("root.laptop.d1.s1", result.get(0).getName());
+
+      // show timeseries root.laptop.d1.(s1,s2,s3)
+      showTimeSeriesPlan =
+          new ShowTimeSeriesPlan(
+              new PartialPath("root.laptop.d1.(s1,s2,s3)"), false, null, null, 0, 0, false);
+      result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+      assertEquals(3, result.size());
+      for (int i = 0; i < result.size(); i++) {
+        assertEquals("root.laptop.d1.s" + (i + 1), result.get(i).getName());
+      }
+
+    } catch (MetadataException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index ed72527..5db9303 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -109,7 +110,7 @@ public class IoTDBSessionSimpleIT {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open();
 
-    List<MeasurementSchema> schemaList = new ArrayList<>();
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
     schemaList.add(new MeasurementSchema("s2", TSDataType.DOUBLE));
     schemaList.add(new MeasurementSchema("s3", TSDataType.TEXT));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 50cf110..cdb9533 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -42,6 +42,9 @@ import java.util.Objects;
 public class VectorMeasurementSchema
     implements IMeasurementSchema, Comparable<VectorMeasurementSchema>, Serializable {
 
+  public static final String ALIGN_TIMESERIES_PREFIX = "$#$";
+
+  private String meausurementId;
   private String[] measurements;
   private byte[] types;
   private byte[] encodings;
@@ -51,10 +54,12 @@ public class VectorMeasurementSchema
   public VectorMeasurementSchema() {}
 
   public VectorMeasurementSchema(
+      String measurementId,
       String[] measurements,
       TSDataType[] types,
       TSEncoding[] encodings,
       CompressionType compressionType) {
+    this.meausurementId = measurementId;
     this.measurements = measurements;
     byte[] typesInByte = new byte[types.length];
     for (int i = 0; i < types.length; i++) {
@@ -100,14 +105,18 @@ public class VectorMeasurementSchema
   }
 
   public VectorMeasurementSchema(
-      String[] measurements, TSDataType[] types, TSEncoding[] encodings) {
+      String measurementId, String[] measurements, TSDataType[] types, TSEncoding[] encodings) {
     this(
-        measurements, types, encodings, TSFileDescriptor.getInstance().getConfig().getCompressor());
+        measurementId,
+        measurements,
+        types,
+        encodings,
+        TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
   @Override
   public String getMeasurementId() {
-    return measurements[0] + ".align";
+    return meausurementId;
   }
 
   @Override
@@ -196,6 +205,8 @@ public class VectorMeasurementSchema
   @Override
   public int serializeTo(ByteBuffer buffer) {
     int byteLen = 0;
+    byteLen +=
+        ReadWriteIOUtils.write(meausurementId.substring(ALIGN_TIMESERIES_PREFIX.length()), buffer);
     byteLen += ReadWriteIOUtils.write(measurements.length, buffer);
 
     for (String measurementId : measurements) {
@@ -215,6 +226,9 @@ public class VectorMeasurementSchema
   @Override
   public int serializeTo(OutputStream outputStream) throws IOException {
     int byteLen = 0;
+    byteLen +=
+        ReadWriteIOUtils.write(
+            meausurementId.substring(ALIGN_TIMESERIES_PREFIX.length()), outputStream);
     byteLen += ReadWriteIOUtils.write(measurements.length, outputStream);
 
     for (String measurementId : measurements) {
@@ -234,6 +248,9 @@ public class VectorMeasurementSchema
   public static VectorMeasurementSchema deserializeFrom(InputStream inputStream)
       throws IOException {
     VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
+    vectorMeasurementSchema.meausurementId =
+        ALIGN_TIMESERIES_PREFIX + ReadWriteIOUtils.readInt(inputStream);
+
     int measurementSize = ReadWriteIOUtils.readInt(inputStream);
     String[] measurements = new String[measurementSize];
     for (int i = 0; i < measurementSize; i++) {
@@ -257,8 +274,10 @@ public class VectorMeasurementSchema
     return vectorMeasurementSchema;
   }
 
-  public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) throws IOException {
+  public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) {
     VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
+    vectorMeasurementSchema.meausurementId =
+        ALIGN_TIMESERIES_PREFIX + ReadWriteIOUtils.readInt(buffer);
     int measurementSize = ReadWriteIOUtils.readInt(buffer);
     String[] measurements = new String[measurementSize];
     for (int i = 0; i < measurementSize; i++) {