You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/04 08:15:02 UTC

[iotdb] 01/01: fix query dead lock

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryBlocking
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0fd2015fab8c5a9cba95b0700bdb4d9071358c8e
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 4 16:14:18 2022 +0800

    fix query dead lock
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java      | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index d739d10..21d612e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -94,12 +94,20 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
             }
             blockingQueue.put(batchData);
 
+            // has limit clause
             if (batchDataLengthList != null) {
               batchDataLengthList[seriesIndex] += batchData.length();
               if (batchDataLengthList[seriesIndex] >= fetchLimit) {
-                break;
+                // the queue has enough space to hold SignalBatchData, just break the while loop
+                if (blockingQueue.remainingCapacity() > 0) {
+                  break;
+                } else { // otherwise, exit without putting SignalBatchData, main thread will submit a new task again, then it will put SignalBatchData successfully
+                  reader.setManagedByQueryManager(false);
+                  return;
+                }
               }
             }
+
             // if the queue also has free space, just submit another itself
             if (blockingQueue.remainingCapacity() > 0) {
               TASK_POOL_MANAGER.submit(this);
@@ -171,6 +179,9 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
 
   private final long queryId;
 
+  // this field record the original value of offset clause, won't change during the query execution
+  protected final int originalRowOffset;
+
   private static final RawQueryReadTaskPoolManager TASK_POOL_MANAGER =
       RawQueryReadTaskPoolManager.getInstance();
 
@@ -190,6 +201,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
         queryPlan.getDeduplicatedDataTypes(),
         queryPlan.isAscending());
     this.rowLimit = queryPlan.getRowLimit();
+    this.originalRowOffset = queryPlan.getRowOffset();
     this.rowOffset = queryPlan.getRowOffset();
     this.withoutAnyNull = queryPlan.isWithoutAnyNull();
     this.withoutAllNull = queryPlan.isWithoutAllNull();
@@ -215,6 +227,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
    */
   public RawQueryDataSetWithoutValueFilter(long queryId) {
     this.queryId = queryId;
+    this.originalRowOffset = 0;
     blockingQueueArray = new BlockingQueue[0];
     timeHeap = new TimeSelector(0, ascending);
   }
@@ -246,7 +259,7 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
         paths.get(seriesIndex).getFullPath(),
         batchDataLengthList,
         seriesIndex,
-        rowLimit + rowOffset);
+        rowLimit + originalRowOffset);
   }
 
   /**