You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/05/13 06:59:36 UTC
[iotdb] 01/01: fix
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch fix_concurrent_cluster_query_bug_and_align_by_device_query_locally
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d9f0eb57f1bbcb4aa7747fd8de47c55de30982b
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu May 13 14:58:40 2021 +0800
fix
---
.../cluster/query/ClusterDataQueryExecutor.java | 9 +--------
.../cluster/query/reader/ClusterReaderFactory.java | 23 ++++------------------
.../query/reader/mult/RemoteMultSeriesReader.java | 15 +++-----------
.../org/apache/iotdb/db/metadata/PartialPath.java | 15 ++++++++++++++
.../iotdb/db/query/reader/series/SeriesReader.java | 6 +++---
5 files changed, 26 insertions(+), 42 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index 168fc2c..8c94a66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
@@ -130,13 +129,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
- String fullPath = partialPath.getFullPath();
- if (partialPath instanceof VectorPartialPath) {
- VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
- if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
- fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
- }
- }
+ String fullPath = PartialPath.getExactFullPath(partialPath);
AssignPathManagedMergeReader assignPathManagedMergeReader =
new AssignPathManagedMergeReader(fullPath, dataType);
for (AbstractMultPointReader multPointReader : multPointReaders) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index eeee119..eb47005 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -316,7 +316,7 @@ public class ClusterReaderFactory {
context,
dataGroupMember,
ascending);
- partialPathPointReaderMap.put(partialPath.getFullPath(), seriesPointReader);
+ partialPathPointReaderMap.put(PartialPath.getExactFullPath(partialPath), seriesPointReader);
}
if (logger.isDebugEnabled()) {
@@ -578,10 +578,7 @@ public class ClusterReaderFactory {
Set<String> fullPaths = Sets.newHashSet();
dataSourceInfo
.getPartialPaths()
- .forEach(
- partialPath -> {
- fullPaths.add(partialPath.getFullPath());
- });
+ .forEach(partialPath -> fullPaths.add(partialPath.getFullPath()));
return new MultEmptyReader(fullPaths);
}
throw new StorageEngineException(
@@ -1004,20 +1001,8 @@ public class ClusterReaderFactory {
context,
dataGroupMember.getHeader(),
ascending);
- if (partialPath instanceof VectorPartialPath) {
- VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
- if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
- partialPathBatchReaderMap.put(
- vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(),
- new SeriesRawDataBatchReader(seriesReader));
- } else {
- partialPathBatchReaderMap.put(
- partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
- }
- } else { // common path
- partialPathBatchReaderMap.put(
- partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
- }
+ partialPathBatchReaderMap.put(
+ PartialPath.getExactFullPath(partialPath), new SeriesRawDataBatchReader(seriesReader));
}
return new MultBatchReader(partialPathBatchReaderMap);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 8b9a61a..d608d7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -75,22 +74,14 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
this.cachedBatchs = Maps.newHashMap();
this.pathToDataType = Maps.newHashMap();
for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
-
- PartialPath partialPath = sourceInfo.getPartialPaths().get(i);
- String fullPath = partialPath.getFullPath();
- if (partialPath instanceof VectorPartialPath) {
- VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
- if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
- fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
- }
- }
+ String fullPath = PartialPath.getExactFullPath(sourceInfo.getPartialPaths().get(i));
this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>());
this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i));
}
}
@Override
- public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+ public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
BatchData batchData = currentBatchDatas.get(fullPath);
if (batchData != null && batchData.hasCurrent()) {
return true;
@@ -108,7 +99,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
}
@Override
- public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+ public synchronized TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
BatchData batchData = currentBatchDatas.get(fullPath);
if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) {
batchData = cachedBatchs.get(fullPath).poll();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index 79d075f..54e4362 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -312,4 +312,19 @@ public class PartialPath extends Path implements Comparable<Path> {
}
return ret;
}
+
+ /**
+ * If the partialPath is VectorPartialPath and it has only one sub sensor, return the sub sensor's
+ * full path. Otherwise, return the partialPath's fullPath
+ */
+ public static String getExactFullPath(PartialPath partialPath) {
+ String fullPath = partialPath.getFullPath();
+ if (partialPath instanceof VectorPartialPath) {
+ VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+ if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+ fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+ }
+ }
+ return fullPath;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 3181d10..f6ac803 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -208,7 +208,7 @@ public class SeriesReader {
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
throw new IOException(
- "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ unSeqPageReaders.isEmpty()
+ " firstPageReader != null is "
+ (firstPageReader != null)
@@ -269,7 +269,7 @@ public class SeriesReader {
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
throw new IOException(
- "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ unSeqPageReaders.isEmpty()
+ " firstPageReader != null is "
+ (firstPageReader != null)
@@ -430,7 +430,7 @@ public class SeriesReader {
return true;
}
- // make sure firstPageReader won't be null while the cachedPageReaders has more cached page
+ // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
// readers
while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {