You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/26 08:03:33 UTC
[incubator-iotdb] branch refactor_query_resource_count updated: fix
a bug
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch refactor_query_resource_count
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_query_resource_count by this push:
new 91476d4 fix a bug
91476d4 is described below
commit 91476d44b7202ed95f23885259d6b64278b14663
Author: lta <li...@163.com>
AuthorDate: Fri Apr 26 16:03:08 2019 +0800
fix a bug
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 5 ++---
.../db/engine/filenode/FileNodeProcessor.java | 23 ++++++++++++++++------
2 files changed, 19 insertions(+), 9 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index dbee398..23d1baf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -694,9 +694,8 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeProcessor.getProcessorName());
fileNodeProcessor.decreaseMultiPassCount(token);
} catch (FileNodeProcessorException e) {
- throw new FileNodeManagerException(String
- .format("FileNodeProcessor of [%s] meets error when ending query",
- fileNodeProcessor.getProcessorName()), e);
+ LOGGER.error("Failed to end query: the deviceId {}, token {}.", deviceId, token, e);
+ throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 2bfc687..d6c32c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -151,7 +152,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* lock resource when switching status in merge process
*/
private Lock oldMultiPassLock;
- private AtomicInteger oldMultiPassCount = null;
+ private CountDownLatch oldMultiPassCount = null;
private AtomicInteger newMultiPassCount = new AtomicInteger(0);
/**
* system recovery
@@ -757,14 +758,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
return true;
} else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
- int oldMultiPassCountValue = oldMultiPassCount.decrementAndGet();
oldMultiPassTokenSet.remove(token);
+ oldMultiPassCount.countDown();
+ long oldMultiPassCountValue = oldMultiPassCount.getCount();
if (oldMultiPassCountValue < 0) {
throw new FileNodeProcessorException(String
.format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
}
LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
- oldMultiPassCount);
+ oldMultiPassCount.getCount());
return true;
} else {
LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -1250,7 +1252,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeLock();
try {
oldMultiPassTokenSet = newMultiPassTokenSet;
- oldMultiPassCount = newMultiPassCount;
+ oldMultiPassCount = new CountDownLatch(newMultiPassCount.get());
oldMultiPassLock = new ReentrantLock(false);
newMultiPassTokenSet = new HashSet<>();
newMultiPassCount = new AtomicInteger(0);
@@ -1352,8 +1354,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
oldMultiPassTokenSet,
oldMultiPassCount);
+ try {
+ oldMultiPassCount.await();
+ } catch (InterruptedException e) {
+ LOGGER.info(
+ "The filenode processor {} encountered an error when it waits until all old queries are over.",
+ getProcessorName());
+ throw new FileNodeProcessorException(e);
+ }
oldMultiPassLock.lock();
}
+
try {
writeLock();
try {
@@ -1710,11 +1721,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
if (oldMultiPassCount == null) {
return true;
}
- if (oldMultiPassCount.get() == 0) {
+ if (oldMultiPassCount.getCount() == 0) {
return true;
} else {
LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
- getProcessorName(), oldMultiPassCount);
+ getProcessorName(), oldMultiPassCount.getCount());
return false;
}
}