You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/01 02:52:10 UTC
[iotdb] 01/02: add VectorSeriesReader
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c95efc9d1381c1419a6768bad9cf7c12831dd3f6
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Fri Oct 29 16:06:17 2021 +0800
add VectorSeriesReader
---
.../cluster/query/reader/ClusterReaderFactory.java | 11 +---
.../org/apache/iotdb/db/metadata/PartialPath.java | 71 +++++++++++++++++-----
.../iotdb/db/metadata/VectorPartialPath.java | 52 ++++++++++++++--
.../query/reader/series/SeriesAggregateReader.java | 13 +---
.../reader/series/SeriesRawDataBatchReader.java | 32 +++-------
.../iotdb/db/query/reader/series/SeriesReader.java | 36 ++++++-----
.../reader/series/SeriesReaderByTimestamp.java | 13 +---
.../reader/series/VectorSeriesAggregateReader.java | 2 +-
.../db/query/reader/series/VectorSeriesReader.java | 65 ++++++++++++++++++++
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 7 +--
10 files changed, 207 insertions(+), 95 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 1de13a2..9a35b61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -560,16 +560,7 @@ public class ClusterReaderFactory {
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter);
- return new SeriesReader(
- path,
- allSensors,
- dataType,
- context,
- queryDataSource,
- timeFilter,
- valueFilter,
- new SlotTsFileFilter(requiredSlots),
- ascending);
+ return path.createSeriesReader(allSensors, dataType, context, queryDataSource, timeFilter, valueFilter, new SlotTsFileFilter(requiredSlots), ascending);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index c052161..b4cf014 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -18,25 +18,31 @@
*/
package org.apache.iotdb.db.metadata;
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
-
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-
/**
* A prefix path, suffix path or fullPath generated from SQL. Usually used in the IoTDB server
* module
@@ -49,7 +55,9 @@ public class PartialPath extends Path implements Comparable<Path> {
// alias of measurement, null pointer cannot be serialized in thrift so empty string is instead
protected String measurementAlias = "";
- public PartialPath() {}
+ public PartialPath() {
+ }
+
/**
* Construct the PartialPath using a String, will split the given String into String[] E.g., path
* = "root.sg.\"d.1\".\"s.1\"" nodes = {"root", "sg", "\"d.1\"", "\"s.1\""}
@@ -67,15 +75,17 @@ public class PartialPath extends Path implements Comparable<Path> {
this.nodes = MetaUtils.splitPathToDetachedPath(fullPath);
}
- /** @param partialNodes nodes of a time series path */
+ /**
+ * @param partialNodes nodes of a time series path
+ */
public PartialPath(String[] partialNodes) {
nodes = partialNodes;
}
/**
- * @param path path
+ * @param path path
* @param needSplit needSplit is basically false, whether need to be split to device and
- * measurement, doesn't support escape character yet.
+ * measurement, doesn't support escape character yet.
*/
public PartialPath(String path, boolean needSplit) {
super(path, needSplit);
@@ -358,4 +368,37 @@ public class PartialPath extends Path implements Comparable<Path> {
public String getExactFullPath() {
return getFullPath();
}
+
+ public SeriesReader createSeriesReader(Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ return new SeriesReader(
+ this,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ @TestOnly
+ public SeriesReader createSeriesReader(Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ return new SeriesReader(this, allSensors, dataType, context, seqFileResource, unseqFileResource,
+ timeFilter, valueFilter, ascending);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
index 82b81a1..93fcd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
@@ -19,12 +19,20 @@
package org.apache.iotdb.db.metadata;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.VectorSeriesReader;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
* VectorPartialPath represents a vector's fullPath. It not only contains the full path of vector's
@@ -36,7 +44,8 @@ public class VectorPartialPath extends PartialPath {
private List<String> subSensorsList;
- public VectorPartialPath() {}
+ public VectorPartialPath() {
+ }
public VectorPartialPath(String vectorPath, List<String> subSensorsList)
throws IllegalPathException {
@@ -119,4 +128,39 @@ public class VectorPartialPath extends PartialPath {
}
return fullPath;
}
+
+ @Override
+ public VectorSeriesReader createSeriesReader(Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ return new VectorSeriesReader(
+ this,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ @Override
+ @TestOnly
+ public VectorSeriesReader createSeriesReader(Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ return new VectorSeriesReader(this, allSensors, dataType, context, seqFileResource,
+ unseqFileResource, timeFilter, valueFilter, ascending);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index 04cb907..91f161f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -44,17 +44,8 @@ public class SeriesAggregateReader implements IAggregateReader {
Filter valueFilter,
TsFileFilter fileFilter,
boolean ascending) {
- this.seriesReader =
- new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
+ this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+ timeFilter, valueFilter, fileFilter, ascending);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 6af9c4e..c944ca4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -57,17 +57,8 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
Filter valueFilter,
TsFileFilter fileFilter,
boolean ascending) {
- this.seriesReader =
- new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- valueFilter,
- fileFilter,
- ascending);
+ this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+ timeFilter, valueFilter, fileFilter, ascending);
}
@TestOnly
@@ -83,17 +74,14 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
boolean ascending) {
Set<String> allSensors = new HashSet<>();
allSensors.add(seriesPath.getMeasurement());
- this.seriesReader =
- new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- seqFileResource,
- unseqFileResource,
- timeFilter,
- valueFilter,
- ascending);
+ this.seriesReader = seriesPath.createSeriesReader(allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ valueFilter,
+ ascending);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index c2af73e..16a508c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -18,8 +18,16 @@
*/
package org.apache.iotdb.db.query.reader.series;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -47,20 +55,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.function.ToLongFunction;
-import java.util.stream.Collectors;
-
public class SeriesReader {
- private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
// inner class of SeriesReader for order purpose
protected TimeOrderUtils orderUtils;
@@ -185,7 +181,7 @@ public class SeriesReader {
@TestOnly
@SuppressWarnings("squid:S107")
- SeriesReader(
+ public SeriesReader(
PartialPath seriesPath,
Set<String> allSensors,
TSDataType dataType,
@@ -1028,7 +1024,7 @@ public class SeriesReader {
protected void unpackUnseqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
- FileLoaderUtils.loadTimeSeriesMetadata(
+ loadTimeSeriesMetadata(
unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
timeseriesMetadata.setModified(true);
@@ -1037,6 +1033,14 @@ public class SeriesReader {
}
}
+ protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors) throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, filter, allSensors);
+ }
+
protected Filter getAnyFilter() {
return timeFilter != null ? timeFilter : valueFilter;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 27796ff..bd68bed 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -47,17 +47,8 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
boolean ascending) {
UnaryFilter timeFilter =
ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE);
- this.seriesReader =
- new SeriesReader(
- seriesPath,
- allSensors,
- dataType,
- context,
- dataSource,
- timeFilter,
- null,
- fileFilter,
- ascending);
+ this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+ timeFilter, null, fileFilter, ascending);
this.ascending = ascending;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
index 23b1243..8db2bd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
@@ -53,7 +53,7 @@ public class VectorSeriesAggregateReader implements IAggregateReader {
TsFileFilter fileFilter,
boolean ascending) {
this.seriesReader =
- new SeriesReader(
+ new VectorSeriesReader(
seriesPath,
allSensors,
dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
new file mode 100644
index 0000000..db47258
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+public class VectorSeriesReader extends SeriesReader {
+
+ public VectorSeriesReader(PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter, boolean ascending) {
+ super(seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ public VectorSeriesReader(PartialPath seriesPath, Set<String> allSensors,
+ TSDataType dataType, QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter, Filter valueFilter, boolean ascending) {
+ super(seriesPath, allSensors, dataType, context, seqFileResource, unseqFileResource, timeFilter,
+ valueFilter, ascending);
+ }
+
+ @Override
+ protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource,
+ PartialPath seriesPath, QueryContext context, Filter filter, Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(resource, (VectorPartialPath) seriesPath, context, filter, allSensors);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 3d2931b..9626991 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -99,11 +99,6 @@ public class FileLoaderUtils {
Filter filter,
Set<String> allSensors)
throws IOException {
- // deal with vector
- if (seriesPath instanceof VectorPartialPath) {
- return loadVectorTimeSeriesMetadata(
- resource, (VectorPartialPath) seriesPath, context, filter, allSensors);
- }
// common path
ITimeSeriesMetadata timeSeriesMetadata;
@@ -159,7 +154,7 @@ public class FileLoaderUtils {
* [root.sg1.d1.vector.s1, root.sg1.d1.vector.s2])
* @param allSensors all sensors belonging to this device that appear in query
*/
- private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata(
+ public static VectorTimeSeriesMetadata loadTimeSeriesMetadata(
TsFileResource resource,
VectorPartialPath vectorPath,
QueryContext context,