You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/13 05:23:36 UTC

[iotdb] branch master updated: support (#3172)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f6e636b  support (#3172)
f6e636b is described below

commit f6e636b2f5e276da1c80e16fb37078259b822ee1
Author: Potato <TX...@gmail.com>
AuthorDate: Thu May 13 13:22:53 2021 +0800

    support (#3172)
    
    Support align by device query for vector in cluster module
---
 .../iotdb/cluster/query/ClusterDataQueryExecutor.java    | 12 ++++++++++--
 .../iotdb/cluster/query/reader/ClusterReaderFactory.java | 16 ++++++++++++++--
 .../query/reader/mult/RemoteMultSeriesReader.java        | 12 +++++++++++-
 .../reader/chunk/metadata/DiskChunkMetadataLoader.java   |  2 +-
 .../apache/iotdb/tsfile/file/metadata/ChunkMetadata.java |  5 +++++
 .../iotdb/tsfile/file/metadata/IChunkMetadata.java       |  2 ++
 .../iotdb/tsfile/file/metadata/VectorChunkMetadata.java  | 14 ++++++++++++++
 7 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index f28f088..168fc2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
@@ -129,10 +130,17 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
       PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+      String fullPath = partialPath.getFullPath();
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+        }
+      }
       AssignPathManagedMergeReader assignPathManagedMergeReader =
-          new AssignPathManagedMergeReader(partialPath.getFullPath(), dataType);
+          new AssignPathManagedMergeReader(fullPath, dataType);
       for (AbstractMultPointReader multPointReader : multPointReaders) {
-        if (multPointReader.getAllPaths().contains(partialPath.getFullPath())) {
+        if (multPointReader.getAllPaths().contains(fullPath)) {
           assignPathManagedMergeReader.addReader(multPointReader, 0);
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 261f5cd..eeee119 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -1004,8 +1004,20 @@ public class ClusterReaderFactory {
               context,
               dataGroupMember.getHeader(),
               ascending);
-      partialPathBatchReaderMap.put(
-          partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          partialPathBatchReaderMap.put(
+              vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(),
+              new SeriesRawDataBatchReader(seriesReader));
+        } else {
+          partialPathBatchReaderMap.put(
+              partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+        }
+      } else { // common path
+        partialPathBatchReaderMap.put(
+            partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+      }
     }
     return new MultBatchReader(partialPathBatchReaderMap);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..8b9a61a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -73,7 +75,15 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     this.cachedBatchs = Maps.newHashMap();
     this.pathToDataType = Maps.newHashMap();
     for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
-      String fullPath = sourceInfo.getPartialPaths().get(i).getFullPath();
+
+      PartialPath partialPath = sourceInfo.getPartialPaths().get(i);
+      String fullPath = partialPath.getFullPath();
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+        }
+      }
       this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>());
       this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 2d457ac..63c8c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -117,7 +117,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
     // very cheap.
     chunkMetadataList.forEach(
         chunkMetadata -> {
-          if (chunkMetadata.getChunkLoader() == null) {
+          if (chunkMetadata.needSetChunkLoader()) {
             chunkMetadata.setFilePath(resource.getTsFilePath());
             chunkMetadata.setClosed(resource.isClosed());
             chunkMetadata.setChunkLoader(new DiskChunkLoader(context));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 642f285..59d74fd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -218,6 +218,11 @@ public class ChunkMetadata implements Accountable, IChunkMetadata {
     return chunkLoader;
   }
 
+  @Override
+  public boolean needSetChunkLoader() {
+    return chunkLoader == null;
+  }
+
   public void setChunkLoader(IChunkLoader chunkLoader) {
     this.chunkLoader = chunkLoader;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1930340..ace5afa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -54,6 +54,8 @@ public interface IChunkMetadata {
 
   IChunkLoader getChunkLoader();
 
+  boolean needSetChunkLoader();
+
   void setChunkLoader(IChunkLoader chunkLoader);
 
   void setFilePath(String filePath);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 974d9d6..01f617a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -103,6 +103,20 @@ public class VectorChunkMetadata implements IChunkMetadata {
   }
 
   @Override
+  public boolean needSetChunkLoader() {
+    if (timeChunkMetadata.needSetChunkLoader()) {
+      return true;
+    } else {
+      for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+        if (chunkMetadata.needSetChunkLoader()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
   public void setChunkLoader(IChunkLoader chunkLoader) {
     timeChunkMetadata.setChunkLoader(chunkLoader);
     for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {