You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/13 05:23:36 UTC
[iotdb] branch master updated: support (#3172)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f6e636b support (#3172)
f6e636b is described below
commit f6e636b2f5e276da1c80e16fb37078259b822ee1
Author: Potato <TX...@gmail.com>
AuthorDate: Thu May 13 13:22:53 2021 +0800
support (#3172)
Support align by device query for vector in cluster module
---
.../iotdb/cluster/query/ClusterDataQueryExecutor.java | 12 ++++++++++--
.../iotdb/cluster/query/reader/ClusterReaderFactory.java | 16 ++++++++++++++--
.../query/reader/mult/RemoteMultSeriesReader.java | 12 +++++++++++-
.../reader/chunk/metadata/DiskChunkMetadataLoader.java | 2 +-
.../apache/iotdb/tsfile/file/metadata/ChunkMetadata.java | 5 +++++
.../iotdb/tsfile/file/metadata/IChunkMetadata.java | 2 ++
.../iotdb/tsfile/file/metadata/VectorChunkMetadata.java | 14 ++++++++++++++
7 files changed, 57 insertions(+), 6 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index f28f088..168fc2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
@@ -129,10 +130,17 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+ String fullPath = partialPath.getFullPath();
+ if (partialPath instanceof VectorPartialPath) {
+ VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+ if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+ fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+ }
+ }
AssignPathManagedMergeReader assignPathManagedMergeReader =
- new AssignPathManagedMergeReader(partialPath.getFullPath(), dataType);
+ new AssignPathManagedMergeReader(fullPath, dataType);
for (AbstractMultPointReader multPointReader : multPointReaders) {
- if (multPointReader.getAllPaths().contains(partialPath.getFullPath())) {
+ if (multPointReader.getAllPaths().contains(fullPath)) {
assignPathManagedMergeReader.addReader(multPointReader, 0);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 261f5cd..eeee119 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -1004,8 +1004,20 @@ public class ClusterReaderFactory {
context,
dataGroupMember.getHeader(),
ascending);
- partialPathBatchReaderMap.put(
- partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+ if (partialPath instanceof VectorPartialPath) {
+ VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+ if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+ partialPathBatchReaderMap.put(
+ vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(),
+ new SeriesRawDataBatchReader(seriesReader));
+ } else {
+ partialPathBatchReaderMap.put(
+ partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+ }
+ } else { // common path
+ partialPathBatchReaderMap.put(
+ partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
+ }
}
return new MultBatchReader(partialPathBatchReaderMap);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..8b9a61a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -73,7 +75,15 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
this.cachedBatchs = Maps.newHashMap();
this.pathToDataType = Maps.newHashMap();
for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
- String fullPath = sourceInfo.getPartialPaths().get(i).getFullPath();
+
+ PartialPath partialPath = sourceInfo.getPartialPaths().get(i);
+ String fullPath = partialPath.getFullPath();
+ if (partialPath instanceof VectorPartialPath) {
+ VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+ if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+ fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+ }
+ }
this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>());
this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 2d457ac..63c8c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -117,7 +117,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
// very cheap.
chunkMetadataList.forEach(
chunkMetadata -> {
- if (chunkMetadata.getChunkLoader() == null) {
+ if (chunkMetadata.needSetChunkLoader()) {
chunkMetadata.setFilePath(resource.getTsFilePath());
chunkMetadata.setClosed(resource.isClosed());
chunkMetadata.setChunkLoader(new DiskChunkLoader(context));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 642f285..59d74fd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -218,6 +218,11 @@ public class ChunkMetadata implements Accountable, IChunkMetadata {
return chunkLoader;
}
+ @Override
+ public boolean needSetChunkLoader() {
+ return chunkLoader == null;
+ }
+
public void setChunkLoader(IChunkLoader chunkLoader) {
this.chunkLoader = chunkLoader;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1930340..ace5afa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -54,6 +54,8 @@ public interface IChunkMetadata {
IChunkLoader getChunkLoader();
+ boolean needSetChunkLoader();
+
void setChunkLoader(IChunkLoader chunkLoader);
void setFilePath(String filePath);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
index 974d9d6..01f617a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java
@@ -103,6 +103,20 @@ public class VectorChunkMetadata implements IChunkMetadata {
}
@Override
+ public boolean needSetChunkLoader() {
+ if (timeChunkMetadata.needSetChunkLoader()) {
+ return true;
+ } else {
+ for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+ if (chunkMetadata.needSetChunkLoader()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
public void setChunkLoader(IChunkLoader chunkLoader) {
timeChunkMetadata.setChunkLoader(chunkLoader);
for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {