You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/05/12 13:16:50 UTC

[iotdb] branch support_vector_align_by_device_query_in_cluster created (now 3bdac50)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch support_vector_align_by_device_query_in_cluster
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 3bdac50  support

This branch includes the following new commits:

     new 3bdac50  support

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: support

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch support_vector_align_by_device_query_in_cluster
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3bdac507905487dbe4483220032b49e12561ac3a
Author: LebronAl <TX...@gmail.com>
AuthorDate: Wed May 12 21:16:00 2021 +0800

    support
---
 .../iotdb/cluster/query/ClusterDataQueryExecutor.java    | 12 ++++++++++--
 .../iotdb/cluster/query/reader/ClusterReaderFactory.java | 16 ++++++++++++++--
 .../query/reader/mult/RemoteMultSeriesReader.java        | 12 +++++++++++-
 .../reader/chunk/metadata/DiskChunkMetadataLoader.java   |  2 +-
 .../apache/iotdb/tsfile/file/metadata/ChunkMetadata.java |  7 ++++++-
 .../iotdb/tsfile/file/metadata/IChunkMetadata.java       |  2 ++
 .../iotdb/tsfile/file/metadata/VectorChunkMetadata.java  | 14 ++++++++++++++
 7 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index f28f088..168fc2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
@@ -129,10 +130,17 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
       PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+      String fullPath = partialPath.getFullPath();
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+        }
+      }
       AssignPathManagedMergeReader assignPathManagedMergeReader =
-          new AssignPathManagedMergeReader(partialPath.getFullPath(), dataType);
+          new AssignPathManagedMergeReader(fullPath, dataType);
       for (AbstractMultPointReader multPointReader : multPointReaders) {
-        if (multPointReader.getAllPaths().contains(partialPath.getFullPath())) {
+        if (multPointReader.getAllPaths().contains(fullPath)) {
           assignPathManagedMergeReader.addReader(multPointReader, 0);
         }
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 261f5cd..eeee119 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -1004,8 +1004,20 @@ public class ClusterReaderFactory {
               context,
               dataGroupMember.getHeader(),
               ascending);
-      partialPathBatchReaderMap.put(
-          partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          partialPathBatchReaderMap.put(
+              vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(),
+              new SeriesRawDataBatchReader(seriesReader));
+        } else {
+          partialPathBatchReaderMap.put(
+              partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+        }
+      } else { // common path
+        partialPathBatchReaderMap.put(
+            partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+      }
     }
     return new MultBatchReader(partialPathBatchReaderMap);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..8b9a61a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -73,7 +75,15 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     this.cachedBatchs = Maps.newHashMap();
     this.pathToDataType = Maps.newHashMap();
     for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
-      String fullPath = sourceInfo.getPartialPaths().get(i).getFullPath();
+
+      PartialPath partialPath = sourceInfo.getPartialPaths().get(i);
+      String fullPath = partialPath.getFullPath();
+      if (partialPath instanceof VectorPartialPath) {
+        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+        }
+      }
       this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>());
       this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 2d457ac..63c8c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -117,7 +117,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
     // very cheap.
     chunkMetadataList.forEach(
         chunkMetadata -> {
-          if (chunkMetadata.getChunkLoader() == null) {
+          if (chunkMetadata.needSetChunkLoader()) {
             chunkMetadata.setFilePath(resource.getTsFilePath());
             chunkMetadata.setClosed(resource.isClosed());
             chunkMetadata.setChunkLoader(new DiskChunkLoader(context));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 642f285..8e4f782 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -58,7 +58,7 @@ public class ChunkMetadata implements Accountable, IChunkMetadata {
   private boolean modified;
 
   /** ChunkLoader of metadata, used to create ChunkReaderWrap */
-  private IChunkLoader chunkLoader;
+  private volatile IChunkLoader chunkLoader;
 
   private Statistics statistics;
 
@@ -218,6 +218,11 @@ public class ChunkMetadata implements Accountable, IChunkMetadata {
     return chunkLoader;
   }
 
+  @Override
+  public boolean needSetChunkLoader() {
+    return chunkLoader == null;
+  }
+
   public void setChunkLoader(IChunkLoader chunkLoader) {
     this.chunkLoader = chunkLoader;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1930340..ace5afa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -54,6 +54,8 @@ public interface IChunkMetadata {
 
   IChunkLoader getChunkLoader();
 
+  boolean needSetChunkLoader();
+
   void setChunkLoader(IChunkLoader chunkLoader);
 
   void setFilePath(String filePath);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 974d9d6..01f617a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -103,6 +103,20 @@ public class VectorChunkMetadata implements IChunkMetadata {
   }
 
   @Override
+  public boolean needSetChunkLoader() {
+    if (timeChunkMetadata.needSetChunkLoader()) {
+      return true;
+    } else {
+      for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+        if (chunkMetadata.needSetChunkLoader()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
   public void setChunkLoader(IChunkLoader chunkLoader) {
     timeChunkMetadata.setChunkLoader(chunkLoader);
     for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {