You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/01 02:52:10 UTC

[iotdb] 01/02: add VectorSeriesReader

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c95efc9d1381c1419a6768bad9cf7c12831dd3f6
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Fri Oct 29 16:06:17 2021 +0800

    add VectorSeriesReader
---
 .../cluster/query/reader/ClusterReaderFactory.java | 11 +---
 .../org/apache/iotdb/db/metadata/PartialPath.java  | 71 +++++++++++++++++-----
 .../iotdb/db/metadata/VectorPartialPath.java       | 52 ++++++++++++++--
 .../query/reader/series/SeriesAggregateReader.java | 13 +---
 .../reader/series/SeriesRawDataBatchReader.java    | 32 +++-------
 .../iotdb/db/query/reader/series/SeriesReader.java | 36 ++++++-----
 .../reader/series/SeriesReaderByTimestamp.java     | 13 +---
 .../reader/series/VectorSeriesAggregateReader.java |  2 +-
 .../db/query/reader/series/VectorSeriesReader.java | 65 ++++++++++++++++++++
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  7 +--
 10 files changed, 207 insertions(+), 95 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 1de13a2..9a35b61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -560,16 +560,7 @@ public class ClusterReaderFactory {
     QueryDataSource queryDataSource =
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
     valueFilter = queryDataSource.updateFilterUsingTTL(valueFilter);
-    return new SeriesReader(
-        path,
-        allSensors,
-        dataType,
-        context,
-        queryDataSource,
-        timeFilter,
-        valueFilter,
-        new SlotTsFileFilter(requiredSlots),
-        ascending);
+    return path.createSeriesReader(allSensors, dataType, context, queryDataSource, timeFilter, valueFilter, new SlotTsFileFilter(requiredSlots), ascending);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index c052161..b4cf014 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -18,25 +18,31 @@
  */
 package org.apache.iotdb.db.metadata;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
-
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-
 /**
  * A prefix path, suffix path or fullPath generated from SQL. Usually used in the IoTDB server
  * module
@@ -49,7 +55,9 @@ public class PartialPath extends Path implements Comparable<Path> {
   // alias of measurement, null pointer cannot be serialized in thrift so empty string is instead
   protected String measurementAlias = "";
 
-  public PartialPath() {}
+  public PartialPath() {
+  }
+
   /**
    * Construct the PartialPath using a String, will split the given String into String[] E.g., path
    * = "root.sg.\"d.1\".\"s.1\"" nodes = {"root", "sg", "\"d.1\"", "\"s.1\""}
@@ -67,15 +75,17 @@ public class PartialPath extends Path implements Comparable<Path> {
     this.nodes = MetaUtils.splitPathToDetachedPath(fullPath);
   }
 
-  /** @param partialNodes nodes of a time series path */
+  /**
+   * @param partialNodes nodes of a time series path
+   */
   public PartialPath(String[] partialNodes) {
     nodes = partialNodes;
   }
 
   /**
-   * @param path path
+   * @param path      path
    * @param needSplit needSplit is basically false, whether need to be split to device and
-   *     measurement, doesn't support escape character yet.
+   *                  measurement, doesn't support escape character yet.
    */
   public PartialPath(String path, boolean needSplit) {
     super(path, needSplit);
@@ -358,4 +368,37 @@ public class PartialPath extends Path implements Comparable<Path> {
   public String getExactFullPath() {
     return getFullPath();
   }
+
+  public SeriesReader createSeriesReader(Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    return new SeriesReader(
+        this,
+        allSensors,
+        dataType,
+        context,
+        dataSource,
+        timeFilter,
+        valueFilter,
+        fileFilter,
+        ascending);
+  }
+
+  @TestOnly
+  public SeriesReader createSeriesReader(Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    return new SeriesReader(this, allSensors, dataType, context, seqFileResource, unseqFileResource,
+        timeFilter, valueFilter, ascending);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
index 82b81a1..93fcd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/VectorPartialPath.java
@@ -19,12 +19,20 @@
 
 package org.apache.iotdb.db.metadata;
 
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.VectorSeriesReader;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
  * VectorPartialPath represents a vector's fullPath. It not only contains the full path of vector's
@@ -36,7 +44,8 @@ public class VectorPartialPath extends PartialPath {
 
   private List<String> subSensorsList;
 
-  public VectorPartialPath() {}
+  public VectorPartialPath() {
+  }
 
   public VectorPartialPath(String vectorPath, List<String> subSensorsList)
       throws IllegalPathException {
@@ -119,4 +128,39 @@ public class VectorPartialPath extends PartialPath {
     }
     return fullPath;
   }
+
+  @Override
+  public VectorSeriesReader createSeriesReader(Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter,
+      boolean ascending) {
+    return new VectorSeriesReader(
+        this,
+        allSensors,
+        dataType,
+        context,
+        dataSource,
+        timeFilter,
+        valueFilter,
+        fileFilter,
+        ascending);
+  }
+
+  @Override
+  @TestOnly
+  public VectorSeriesReader createSeriesReader(Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      Filter timeFilter,
+      Filter valueFilter,
+      boolean ascending) {
+    return new VectorSeriesReader(this, allSensors, dataType, context, seqFileResource,
+        unseqFileResource, timeFilter, valueFilter, ascending);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
index 04cb907..91f161f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -44,17 +44,8 @@ public class SeriesAggregateReader implements IAggregateReader {
       Filter valueFilter,
       TsFileFilter fileFilter,
       boolean ascending) {
-    this.seriesReader =
-        new SeriesReader(
-            seriesPath,
-            allSensors,
-            dataType,
-            context,
-            dataSource,
-            timeFilter,
-            valueFilter,
-            fileFilter,
-            ascending);
+    this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+        timeFilter, valueFilter, fileFilter, ascending);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 6af9c4e..c944ca4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -57,17 +57,8 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
       Filter valueFilter,
       TsFileFilter fileFilter,
       boolean ascending) {
-    this.seriesReader =
-        new SeriesReader(
-            seriesPath,
-            allSensors,
-            dataType,
-            context,
-            dataSource,
-            timeFilter,
-            valueFilter,
-            fileFilter,
-            ascending);
+    this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+        timeFilter, valueFilter, fileFilter, ascending);
   }
 
   @TestOnly
@@ -83,17 +74,14 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
       boolean ascending) {
     Set<String> allSensors = new HashSet<>();
     allSensors.add(seriesPath.getMeasurement());
-    this.seriesReader =
-        new SeriesReader(
-            seriesPath,
-            allSensors,
-            dataType,
-            context,
-            seqFileResource,
-            unseqFileResource,
-            timeFilter,
-            valueFilter,
-            ascending);
+    this.seriesReader = seriesPath.createSeriesReader(allSensors,
+        dataType,
+        context,
+        seqFileResource,
+        unseqFileResource,
+        timeFilter,
+        valueFilter,
+        ascending);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index c2af73e..16a508c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -18,8 +18,16 @@
  */
 package org.apache.iotdb.db.query.reader.series;
 
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -47,20 +55,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.function.ToLongFunction;
-import java.util.stream.Collectors;
-
 public class SeriesReader {
 
-  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
   // inner class of SeriesReader for order purpose
   protected TimeOrderUtils orderUtils;
 
@@ -185,7 +181,7 @@ public class SeriesReader {
 
   @TestOnly
   @SuppressWarnings("squid:S107")
-  SeriesReader(
+  public SeriesReader(
       PartialPath seriesPath,
       Set<String> allSensors,
       TSDataType dataType,
@@ -1028,7 +1024,7 @@ public class SeriesReader {
 
   protected void unpackUnseqTsFileResource() throws IOException {
     ITimeSeriesMetadata timeseriesMetadata =
-        FileLoaderUtils.loadTimeSeriesMetadata(
+        loadTimeSeriesMetadata(
             unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
     if (timeseriesMetadata != null) {
       timeseriesMetadata.setModified(true);
@@ -1037,6 +1033,14 @@ public class SeriesReader {
     }
   }
 
+  protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource,
+      PartialPath seriesPath,
+      QueryContext context,
+      Filter filter,
+      Set<String> allSensors) throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, filter, allSensors);
+  }
+
   protected Filter getAnyFilter() {
     return timeFilter != null ? timeFilter : valueFilter;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 27796ff..bd68bed 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -47,17 +47,8 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
       boolean ascending) {
     UnaryFilter timeFilter =
         ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE);
-    this.seriesReader =
-        new SeriesReader(
-            seriesPath,
-            allSensors,
-            dataType,
-            context,
-            dataSource,
-            timeFilter,
-            null,
-            fileFilter,
-            ascending);
+    this.seriesReader = seriesPath.createSeriesReader(allSensors, dataType, context, dataSource,
+        timeFilter, null, fileFilter, ascending);
     this.ascending = ascending;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
index 23b1243..8db2bd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java
@@ -53,7 +53,7 @@ public class VectorSeriesAggregateReader implements IAggregateReader {
       TsFileFilter fileFilter,
       boolean ascending) {
     this.seriesReader =
-        new SeriesReader(
+        new VectorSeriesReader(
             seriesPath,
             allSensors,
             dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
new file mode 100644
index 0000000..db47258
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+public class VectorSeriesReader extends SeriesReader {
+
+  public VectorSeriesReader(PartialPath seriesPath,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      QueryDataSource dataSource,
+      Filter timeFilter,
+      Filter valueFilter,
+      TsFileFilter fileFilter, boolean ascending) {
+    super(seriesPath, allSensors, dataType, context, dataSource, timeFilter, valueFilter,
+        fileFilter,
+        ascending);
+  }
+
+  public VectorSeriesReader(PartialPath seriesPath, Set<String> allSensors,
+      TSDataType dataType, QueryContext context,
+      List<TsFileResource> seqFileResource,
+      List<TsFileResource> unseqFileResource,
+      Filter timeFilter, Filter valueFilter, boolean ascending) {
+    super(seriesPath, allSensors, dataType, context, seqFileResource, unseqFileResource, timeFilter,
+        valueFilter, ascending);
+  }
+
+  @Override
+  protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource,
+      PartialPath seriesPath, QueryContext context, Filter filter, Set<String> allSensors)
+      throws IOException {
+    return FileLoaderUtils.loadTimeSeriesMetadata(resource, (VectorPartialPath) seriesPath, context, filter, allSensors);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 3d2931b..9626991 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -99,11 +99,6 @@ public class FileLoaderUtils {
       Filter filter,
       Set<String> allSensors)
       throws IOException {
-    // deal with vector
-    if (seriesPath instanceof VectorPartialPath) {
-      return loadVectorTimeSeriesMetadata(
-          resource, (VectorPartialPath) seriesPath, context, filter, allSensors);
-    }
 
     // common path
     ITimeSeriesMetadata timeSeriesMetadata;
@@ -159,7 +154,7 @@ public class FileLoaderUtils {
    *     [root.sg1.d1.vector.s1, root.sg1.d1.vector.s2])
    * @param allSensors all sensors belonging to this device that appear in query
    */
-  private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata(
+  public static VectorTimeSeriesMetadata loadTimeSeriesMetadata(
       TsFileResource resource,
       VectorPartialPath vectorPath,
       QueryContext context,