You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/12 04:20:06 UTC
[iotdb] branch rel/0.12 updated: fix bug (#4363)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 099eeea fix bug (#4363)
099eeea is described below
commit 099eeea07b699aa6d8e8847014eecafbbeaf6405
Author: Potato <TX...@gmail.com>
AuthorDate: Fri Nov 12 12:18:13 2021 +0800
fix bug (#4363)
---
.../iotdb/cluster/query/reader/DataSourceInfo.java | 5 +--
.../query/reader/mult/MultDataSourceInfo.java | 5 +--
.../iotdb/db/query/control/QueryFileManager.java | 36 ++++++++++++----------
3 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 4ba11e4..e609816 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -97,8 +97,6 @@ public class DataSourceInfo {
if (newReaderId != null) {
logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
if (newReaderId != -1) {
- // register the node so the remote resources can be released
- context.registerRemoteNode(node, partitionGroup.getHeader());
this.readerId = newReaderId;
this.curSource = node;
this.curPos = nextNodePos;
@@ -116,6 +114,9 @@ public class DataSourceInfo {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } finally {
+ // register the node so the remote resources can be released
+ context.registerRemoteNode(node, partitionGroup.getHeader());
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index a4488aa..0b35a4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -102,8 +102,6 @@ public class MultDataSourceInfo {
if (newReaderId != null) {
logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
if (newReaderId != -1) {
- // register the node so the remote resources can be released
- context.registerRemoteNode(node, partitionGroup.getHeader());
this.readerId = newReaderId;
this.curSource = node;
this.curPos = nextNodePos;
@@ -121,6 +119,9 @@ public class MultDataSourceInfo {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } finally {
+ // register the node so the remote resources can be released
+ context.registerRemoteNode(node, partitionGroup.getHeader());
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index d75bbe3..70ee622 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -21,11 +21,9 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -36,9 +34,9 @@ import java.util.concurrent.ConcurrentHashMap;
public class QueryFileManager {
/** Map<queryId, Set<filePaths>> */
- private Map<Long, Set<TsFileResource>> sealedFilePathsMap;
+ private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap;
- private Map<Long, Set<TsFileResource>> unsealedFilePathsMap;
+ private Map<Long, Map<TsFileResource, TsFileResource>> unsealedFilePathsMap;
QueryFileManager() {
sealedFilePathsMap = new ConcurrentHashMap<>();
@@ -50,8 +48,8 @@ public class QueryFileManager {
* must be invoked.
*/
void addQueryId(long queryId) {
- sealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
- unsealedFilePathsMap.computeIfAbsent(queryId, x -> new HashSet<>());
+ sealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>());
+ unsealedFilePathsMap.computeIfAbsent(queryId, x -> new ConcurrentHashMap<>());
}
/** Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap. */
@@ -73,10 +71,10 @@ public class QueryFileManager {
// this file may be deleted just before we lock it
if (tsFileResource.isDeleted()) {
- Map<Long, Set<TsFileResource>> pathMap =
+ Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
!isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
// This resource may be removed by other threads of this query.
- if (pathMap.get(queryId).remove(tsFileResource)) {
+ if (pathMap.get(queryId).remove(tsFileResource) != null) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
}
iterator.remove();
@@ -93,7 +91,7 @@ public class QueryFileManager {
sealedFilePathsMap.computeIfPresent(
queryId,
(k, v) -> {
- for (TsFileResource tsFile : v) {
+ for (TsFileResource tsFile : v.keySet()) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
}
return null;
@@ -101,7 +99,7 @@ public class QueryFileManager {
unsealedFilePathsMap.computeIfPresent(
queryId,
(k, v) -> {
- for (TsFileResource tsFile : v) {
+ for (TsFileResource tsFile : v.keySet()) {
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
}
return null;
@@ -115,11 +113,17 @@ public class QueryFileManager {
* not return null.
*/
void addFilePathToMap(long queryId, TsFileResource tsFile, boolean isClosed) {
- Map<Long, Set<TsFileResource>> pathMap = isClosed ? sealedFilePathsMap : unsealedFilePathsMap;
- // TODO this is not an atomic operation, is there concurrent problem?
- if (!pathMap.get(queryId).contains(tsFile)) {
- pathMap.get(queryId).add(tsFile);
- FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
- }
+ Map<Long, Map<TsFileResource, TsFileResource>> pathMap =
+ isClosed ? sealedFilePathsMap : unsealedFilePathsMap;
+ // Although there are no concurrency issues here at the moment, I've implemented thread-safe
+ // code here to avoid leaving holes for future newcomers.
+ pathMap
+ .get(queryId)
+ .computeIfAbsent(
+ tsFile,
+ k -> {
+ FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
+ return k;
+ });
}
}