You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/17 08:41:07 UTC
[iotdb] branch Vector updated: Add measurementId in
VectorMeasurementSchema
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch Vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/Vector by this push:
new 0b8d79d Add measurementId in VectorMeasurementSchema
new b79d5a7 Merge remote-tracking branch 'origin/Vector' into Vector
0b8d79d is described below
commit 0b8d79d964cce36c5649f438b0c61ce3236cfaf4
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Mar 17 16:25:40 2021 +0800
Add measurementId in VectorMeasurementSchema
---
.../org/apache/iotdb/db/metadata/MManager.java | 72 ++++++++++--------
.../java/org/apache/iotdb/db/metadata/MTree.java | 19 ++++-
.../iotdb/db/metadata/template/Template.java | 20 +++--
.../db/engine/memtable/MemTableTestUtils.java | 5 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 5 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 87 ++++++++++++++++++----
.../apache/iotdb/session/IoTDBSessionSimpleIT.java | 3 +-
.../write/schema/VectorMeasurementSchema.java | 27 ++++++-
8 files changed, 178 insertions(+), 60 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index e8e1d91..3d39b60 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,35 +18,8 @@
*/
package org.apache.iotdb.db.metadata;
-import static java.util.stream.Collectors.toList;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -105,9 +78,39 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -783,6 +786,9 @@ public class MManager {
return TSDataType.INT64;
}
+ if (path instanceof VectorPartialPath) {
+ return TSDataType.VECTOR;
+ }
IMeasurementSchema schema = mtree.getSchema(path);
if (schema instanceof MeasurementSchema) {
return schema.getType();
@@ -792,6 +798,10 @@ public class MManager {
}
}
+ public TSDataType getSeriesType(VectorPartialPath path) {
+ return TSDataType.VECTOR;
+ }
+
public MeasurementMNode[] getMNodes(PartialPath deviceId, String[] measurements)
throws MetadataException {
MNode deviceNode = getNodeByPath(deviceId);
@@ -1119,7 +1129,11 @@ public class MManager {
encodings[i] = schema.getValueTSEncodingList().get(index);
}
return new VectorMeasurementSchema(
- measurementIndices, types, encodings, schema.getCompressor());
+ IoTDBConstant.ALIGN_TIMESERIES_PREFIX,
+ measurementIndices,
+ types,
+ encodings,
+ schema.getCompressor());
}
if (leaf != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 30225c8..d19c1dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -351,6 +351,7 @@ public class MTree implements Serializable {
cur,
leafName,
new VectorMeasurementSchema(
+ leafName,
measurements.toArray(new String[measurementsSize]),
dataTypes.toArray(new TSDataType[measurementsSize]),
encodings.toArray(new TSEncoding[measurementsSize]),
@@ -1240,8 +1241,15 @@ public class MTree implements Serializable {
addMeasurementSchema(
node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
} else if (measurementSchema instanceof VectorMeasurementSchema) {
+ String lastWord = nodes[nodes.length - 1];
addVectorMeasurementSchema(
- node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
+ node,
+ timeseriesSchemaList,
+ needLast,
+ queryContext,
+ measurementSchema,
+ "*",
+ nodes.length == idx ? lastWord : null);
}
if (hasLimit) {
count.set(count.get() + 1);
@@ -1316,7 +1324,8 @@ public class MTree implements Serializable {
needLast,
queryContext,
schema,
- nodeReg);
+ nodeReg,
+ null);
}
}
}
@@ -1354,12 +1363,16 @@ public class MTree implements Serializable {
boolean needLast,
QueryContext queryContext,
IMeasurementSchema schema,
- String reg)
+ String reg,
+ String lastMeasurement)
throws StorageGroupNotSetException, IllegalPathException {
List<String> measurements = schema.getValueMeasurementIdList();
int measurementSize = measurements.size();
for (int i = 0; i < measurementSize; i++) {
if (Pattern.matches(reg.replace("*", ".*"), measurements.get(i))) {
+ if (lastMeasurement != null && !lastMeasurement.contains(measurements.get(i))) {
+ continue;
+ }
PartialPath devicePath = node.getPartialPath().getDevicePath();
String[] tsRow = new String[7];
tsRow[0] = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index c9f0f9d..19b9551 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -18,12 +18,7 @@
*/
package org.apache.iotdb.db.metadata.template;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
@@ -33,6 +28,13 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class Template {
String name;
@@ -59,7 +61,11 @@ public class Template {
curSchema =
new VectorMeasurementSchema(
- measurementsArray, typeArray, encodingArray, plan.getCompressors().get(i));
+ IoTDBConstant.ALIGN_TIMESERIES_PREFIX,
+ measurementsArray,
+ typeArray,
+ encodingArray,
+ plan.getCompressors().get(i));
}
// normal measurement
else {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index d74c375..b0c84ed 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
@@ -89,7 +90,9 @@ public class MemTableTestUtils {
encodings[1] = TSEncoding.GORILLA;
MeasurementMNode[] mNodes = new MeasurementMNode[2];
- IMeasurementSchema schema = new VectorMeasurementSchema(measurements, dataTypes, encodings);
+ IMeasurementSchema schema =
+ new VectorMeasurementSchema(
+ IoTDBConstant.ALIGN_TIMESERIES_PREFIX, measurements, dataTypes, encodings);
mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 0f56da6..7332705 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -297,7 +298,9 @@ public class PrimitiveMemTableTest {
String deviceId = "root.sg.device5";
MeasurementMNode[] mNodes = new MeasurementMNode[2];
- IMeasurementSchema schema = new VectorMeasurementSchema(measurements, dataTypes, encodings);
+ IMeasurementSchema schema =
+ new VectorMeasurementSchema(
+ IoTDBConstant.ALIGN_TIMESERIES_PREFIX, measurements, dataTypes, encodings);
mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 4de11a9..be9f68a 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -18,20 +18,6 @@
*/
package org.apache.iotdb.db.metadata;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -40,6 +26,9 @@ import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -47,11 +36,27 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class MManagerBasicTest {
private CompressionType compressionType;
@@ -1069,4 +1074,58 @@ public class MManagerBasicTest {
e.getMessage());
}
}
+
+ @Test
+ public void testShowTimeseries() {
+ MManager manager = IoTDB.metaManager;
+ try {
+ manager.createTimeseries(
+ new PartialPath("root.laptop.d1.s0"),
+ TSDataType.valueOf("INT32"),
+ TSEncoding.valueOf("RLE"),
+ compressionType,
+ Collections.emptyMap());
+ manager.createAlignedTimeSeries(
+ new PartialPath("root.laptop.d1"),
+ Arrays.asList("s1", "s2", "s3"),
+ Arrays.asList(
+ TSDataType.valueOf("INT32"),
+ TSDataType.valueOf("FLOAT"),
+ TSDataType.valueOf("INT32")),
+ Arrays.asList(
+ TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE"), TSEncoding.valueOf("RLE")),
+ compressionType);
+
+ // show timeseries root.laptop.d1.s0
+ ShowTimeSeriesPlan showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.s0"), false, null, null, 0, 0, false);
+ List<ShowTimeSeriesResult> result =
+ manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(1, result.size());
+ assertEquals("root.laptop.d1.s0", result.get(0).getName());
+
+ // show timeseries root.laptop.d1.s1
+ showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.s1"), false, null, null, 0, 0, false);
+ result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(1, result.size());
+ assertEquals("root.laptop.d1.s1", result.get(0).getName());
+
+ // show timeseries root.laptop.d1.(s1,s2,s3)
+ showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.(s1,s2,s3)"), false, null, null, 0, 0, false);
+ result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(3, result.size());
+ for (int i = 0; i < result.size(); i++) {
+ assertEquals("root.laptop.d1.s" + (i + 1), result.get(i).getName());
+ }
+
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index ed72527..5db9303 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -109,7 +110,7 @@ public class IoTDBSessionSimpleIT {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
- List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s2", TSDataType.DOUBLE));
schemaList.add(new MeasurementSchema("s3", TSDataType.TEXT));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 50cf110..cdb9533 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -42,6 +42,9 @@ import java.util.Objects;
public class VectorMeasurementSchema
implements IMeasurementSchema, Comparable<VectorMeasurementSchema>, Serializable {
+ public static final String ALIGN_TIMESERIES_PREFIX = "$#$";
+
+ private String meausurementId;
private String[] measurements;
private byte[] types;
private byte[] encodings;
@@ -51,10 +54,12 @@ public class VectorMeasurementSchema
public VectorMeasurementSchema() {}
public VectorMeasurementSchema(
+ String measurementId,
String[] measurements,
TSDataType[] types,
TSEncoding[] encodings,
CompressionType compressionType) {
+ this.meausurementId = measurementId;
this.measurements = measurements;
byte[] typesInByte = new byte[types.length];
for (int i = 0; i < types.length; i++) {
@@ -100,14 +105,18 @@ public class VectorMeasurementSchema
}
public VectorMeasurementSchema(
- String[] measurements, TSDataType[] types, TSEncoding[] encodings) {
+ String measurementId, String[] measurements, TSDataType[] types, TSEncoding[] encodings) {
this(
- measurements, types, encodings, TSFileDescriptor.getInstance().getConfig().getCompressor());
+ measurementId,
+ measurements,
+ types,
+ encodings,
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
}
@Override
public String getMeasurementId() {
- return measurements[0] + ".align";
+ return meausurementId;
}
@Override
@@ -196,6 +205,8 @@ public class VectorMeasurementSchema
@Override
public int serializeTo(ByteBuffer buffer) {
int byteLen = 0;
+ byteLen +=
+ ReadWriteIOUtils.write(meausurementId.substring(ALIGN_TIMESERIES_PREFIX.length()), buffer);
byteLen += ReadWriteIOUtils.write(measurements.length, buffer);
for (String measurementId : measurements) {
@@ -215,6 +226,9 @@ public class VectorMeasurementSchema
@Override
public int serializeTo(OutputStream outputStream) throws IOException {
int byteLen = 0;
+ byteLen +=
+ ReadWriteIOUtils.write(
+ meausurementId.substring(ALIGN_TIMESERIES_PREFIX.length()), outputStream);
byteLen += ReadWriteIOUtils.write(measurements.length, outputStream);
for (String measurementId : measurements) {
@@ -234,6 +248,9 @@ public class VectorMeasurementSchema
public static VectorMeasurementSchema deserializeFrom(InputStream inputStream)
throws IOException {
VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
+ vectorMeasurementSchema.meausurementId =
+ ALIGN_TIMESERIES_PREFIX + ReadWriteIOUtils.readInt(inputStream);
+
int measurementSize = ReadWriteIOUtils.readInt(inputStream);
String[] measurements = new String[measurementSize];
for (int i = 0; i < measurementSize; i++) {
@@ -257,8 +274,10 @@ public class VectorMeasurementSchema
return vectorMeasurementSchema;
}
- public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) throws IOException {
+ public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) {
VectorMeasurementSchema vectorMeasurementSchema = new VectorMeasurementSchema();
+ vectorMeasurementSchema.meausurementId =
+ ALIGN_TIMESERIES_PREFIX + ReadWriteIOUtils.readInt(buffer);
int measurementSize = ReadWriteIOUtils.readInt(buffer);
String[] measurements = new String[measurementSize];
for (int i = 0; i < measurementSize; i++) {