You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/05/13 06:59:36 UTC

[iotdb] 01/01: fix

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch fix_concurrent_cluster_query_bug_and_align_by_device_query_locally
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d9f0eb57f1bbcb4aa7747fd8de47c55de30982b
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu May 13 14:58:40 2021 +0800

    fix
---
 .../cluster/query/ClusterDataQueryExecutor.java    |  9 +--------
 .../cluster/query/reader/ClusterReaderFactory.java | 23 ++++------------------
 .../query/reader/mult/RemoteMultSeriesReader.java  | 15 +++-----------
 .../org/apache/iotdb/db/metadata/PartialPath.java  | 15 ++++++++++++++
 .../iotdb/db/query/reader/series/SeriesReader.java |  6 +++---
 5 files changed, 26 insertions(+), 42 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index 168fc2c..8c94a66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
@@ -130,13 +129,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
     for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
       PartialPath partialPath = queryPlan.getDeduplicatedPaths().get(i);
       TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
-      String fullPath = partialPath.getFullPath();
-      if (partialPath instanceof VectorPartialPath) {
-        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
-        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
-          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
-        }
-      }
+      String fullPath = PartialPath.getExactFullPath(partialPath);
       AssignPathManagedMergeReader assignPathManagedMergeReader =
           new AssignPathManagedMergeReader(fullPath, dataType);
       for (AbstractMultPointReader multPointReader : multPointReaders) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index eeee119..eb47005 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -316,7 +316,7 @@ public class ClusterReaderFactory {
                 context,
                 dataGroupMember,
                 ascending);
-        partialPathPointReaderMap.put(partialPath.getFullPath(), seriesPointReader);
+        partialPathPointReaderMap.put(PartialPath.getExactFullPath(partialPath), seriesPointReader);
       }
 
       if (logger.isDebugEnabled()) {
@@ -578,10 +578,7 @@ public class ClusterReaderFactory {
       Set<String> fullPaths = Sets.newHashSet();
       dataSourceInfo
           .getPartialPaths()
-          .forEach(
-              partialPath -> {
-                fullPaths.add(partialPath.getFullPath());
-              });
+          .forEach(partialPath -> fullPaths.add(partialPath.getFullPath()));
       return new MultEmptyReader(fullPaths);
     }
     throw new StorageEngineException(
@@ -1004,20 +1001,8 @@ public class ClusterReaderFactory {
               context,
               dataGroupMember.getHeader(),
               ascending);
-      if (partialPath instanceof VectorPartialPath) {
-        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
-        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
-          partialPathBatchReaderMap.put(
-              vectorPartialPath.getSubSensorsPathList().get(0).getFullPath(),
-              new SeriesRawDataBatchReader(seriesReader));
-        } else {
-          partialPathBatchReaderMap.put(
-              partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
-        }
-      } else { // common path
-        partialPathBatchReaderMap.put(
-            partialPath.getFullPath(), new SeriesRawDataBatchReader(seriesReader));
-      }
+      partialPathBatchReaderMap.put(
+          PartialPath.getExactFullPath(partialPath), new SeriesRawDataBatchReader(seriesReader));
     }
     return new MultBatchReader(partialPathBatchReaderMap);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 8b9a61a..d608d7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -75,22 +74,14 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
     this.cachedBatchs = Maps.newHashMap();
     this.pathToDataType = Maps.newHashMap();
     for (int i = 0; i < sourceInfo.getPartialPaths().size(); i++) {
-
-      PartialPath partialPath = sourceInfo.getPartialPaths().get(i);
-      String fullPath = partialPath.getFullPath();
-      if (partialPath instanceof VectorPartialPath) {
-        VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
-        if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
-          fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
-        }
-      }
+      String fullPath = PartialPath.getExactFullPath(sourceInfo.getPartialPaths().get(i));
       this.cachedBatchs.put(fullPath, new ConcurrentLinkedQueue<>());
       this.pathToDataType.put(fullPath, sourceInfo.getDataTypes().get(i));
     }
   }
 
   @Override
-  public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+  public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
     BatchData batchData = currentBatchDatas.get(fullPath);
     if (batchData != null && batchData.hasCurrent()) {
       return true;
@@ -108,7 +99,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
   }
 
   @Override
-  public TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
+  public synchronized TimeValuePair nextTimeValuePair(String fullPath) throws IOException {
     BatchData batchData = currentBatchDatas.get(fullPath);
     if ((batchData == null || !batchData.hasCurrent()) && checkPathBatchData(fullPath)) {
       batchData = cachedBatchs.get(fullPath).poll();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
index 79d075f..54e4362 100755
--- a/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/PartialPath.java
@@ -312,4 +312,19 @@ public class PartialPath extends Path implements Comparable<Path> {
     }
     return ret;
   }
+
+  /**
+   * If the partialPath is VectorPartialPath and it has only one sub sensor, return the sub sensor's
+   * full path. Otherwise, return the partialPath's fullPath
+   */
+  public static String getExactFullPath(PartialPath partialPath) {
+    String fullPath = partialPath.getFullPath();
+    if (partialPath instanceof VectorPartialPath) {
+      VectorPartialPath vectorPartialPath = (VectorPartialPath) partialPath;
+      if (vectorPartialPath.getSubSensorsPathList().size() == 1) {
+        fullPath = vectorPartialPath.getSubSensorsPathList().get(0).getFullPath();
+      }
+    }
+    return fullPath;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 3181d10..f6ac803 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -208,7 +208,7 @@ public class SeriesReader {
         || firstPageReader != null
         || mergeReader.hasNextTimeValuePair()) {
       throw new IOException(
-          "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+          "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
               + unSeqPageReaders.isEmpty()
               + " firstPageReader != null is "
               + (firstPageReader != null)
@@ -269,7 +269,7 @@ public class SeriesReader {
         || firstPageReader != null
         || mergeReader.hasNextTimeValuePair()) {
       throw new IOException(
-          "all cached pages should be consumed first cachedPageReaders.isEmpty() is "
+          "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
               + unSeqPageReaders.isEmpty()
               + " firstPageReader != null is "
               + (firstPageReader != null)
@@ -430,7 +430,7 @@ public class SeriesReader {
       return true;
     }
 
-    // make sure firstPageReader won't be null while the cachedPageReaders has more cached page
+    // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
     // readers
     while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {