You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/12 02:28:00 UTC

[iotdb] branch jira1865_master created (now 5cb6baf)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira1865_master
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 5cb6baf  fix

This branch includes the following new commits:

     new 5cb6baf  fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira1865_master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5cb6baf27fed1a4f740ddf2176db99206148c26a
Author: LebronAl <TX...@gmail.com>
AuthorDate: Fri Nov 12 09:52:24 2021 +0800

    fix
---
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  5 +--
 .../query/reader/mult/MultDataSourceInfo.java      |  5 +--
 .../iotdb/db/query/control/QueryFileManager.java   | 36 ++++++++++++----------
 3 files changed, 26 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 02be077..57eb9d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -95,8 +95,6 @@ public class DataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -114,6 +112,9 @@ public class DataSourceInfo {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
       } catch (Exception e) {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index e849f04..1521ba3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -100,8 +100,6 @@ public class MultDataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -119,6 +117,9 @@ public class MultDataSourceInfo {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
       } catch (Exception e) {
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index d75bbe3..70ee622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -21,11 +21,9 @@ package org.apache.iotdb.db.query.control;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -36,9 +34,9 @@ import java.util.concurrent.ConcurrentHashMap;
 public class QueryFileManager {
 
   /** Map<queryId, Set<filePaths>> */
-  private Map<Long, Set<TsFileResource>> sealedFilePathsMap;
+  private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap;
 
-  private Map<Long, Set<TsFileResource>> unsealedFilePathsMap;
+  private Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap;
 
   QueryFileManager() {
     sealedFilePathsMap = new ConcurrentHashMap<>();
@@ -50,8 +48,8 @@ public class QueryFileManager {
    * must be invoked.
    */
   void addQueryId(long queryId) {
-    sealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
-    unsealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
+    sealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>());
+    unsealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>());
   }
 
   /** Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap. */
@@ -73,10 +71,10 @@ public class QueryFileManager {
 
       // this file may be deleted just before we lock it
       if (tsFileResource.isDeleted()) {
-        Map<Long, Set<TsFileResource>> pathMap =
+        Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
             !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
         // This resource may be removed by other threads of this query.
-        if (pathMap.get(queryId).remove(tsFileResource)) {
+        if (pathMap.get(queryId).remove(tsFileResource) != null) {
           FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
         }
         iterator.remove();
@@ -93,7 +91,7 @@ public class QueryFileManager {
     sealedFilePathsMap.computeIfPresent(
         queryId,
         (k, v) -> {
-          for (TsFileResource tsFile : v) {
+          for (TsFileResource tsFile : v.keySet()) {
             FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
           }
           return null;
@@ -101,7 +99,7 @@ public class QueryFileManager {
     unsealedFilePathsMap.computeIfPresent(
         queryId,
         (k, v) -> {
-          for (TsFileResource tsFile : v) {
+          for (TsFileResource tsFile : v.keySet()) {
             FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
           }
           return null;
@@ -115,11 +113,17 @@ public class QueryFileManager {
    * not return null.
    */
   void addFilePathToMap(long queryId, TsFileResource tsFile, boolean isClosed) {
-    Map<Long, Set<TsFileResource>> pathMap = isClosed ? sealedFilePathsMap : unsealedFilePathsMap;
-    // TODO this is not an atomic operation, is there concurrent problem?
-    if (!pathMap.get(queryId).contains(tsFile)) {
-      pathMap.get(queryId).add(tsFile);
-      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
-    }
+    Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
+        isClosed ? sealedFilePathsMap : unsealedFilePathsMap;
+    // Although there are no concurrency issues here at the moment, I've implemented thread-safe
+    // code here to avoid leaving holes for future newcomers.
+    pathMap
+        .get(queryId)
+        .computeIfAbsent(
+            tsFile,
+            k -> {
+              FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
+              return k;
+            });
   }
 }