You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/26 08:03:33 UTC

[incubator-iotdb] branch refactor_query_resource_count updated: fix a bug

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch refactor_query_resource_count
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_query_resource_count by this push:
     new 91476d4  fix a bug
91476d4 is described below

commit 91476d44b7202ed95f23885259d6b64278b14663
Author: lta <li...@163.com>
AuthorDate: Fri Apr 26 16:03:08 2019 +0800

    fix a bug
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  5 ++---
 .../db/engine/filenode/FileNodeProcessor.java      | 23 ++++++++++++++++------
 2 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index dbee398..23d1baf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -694,9 +694,8 @@ public class FileNodeManager implements IStatistic, IService {
           fileNodeProcessor.getProcessorName());
       fileNodeProcessor.decreaseMultiPassCount(token);
     } catch (FileNodeProcessorException e) {
-      throw new FileNodeManagerException(String
-          .format("FileNodeProcessor of [%s] meets error when ending query",
-              fileNodeProcessor.getProcessorName()), e);
+      LOGGER.error("Failed to end query: the deviceId {}, token {}.", deviceId, token, e);
+      throw new FileNodeManagerException(e);
     } finally {
       fileNodeProcessor.writeUnlock();
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 2bfc687..d6c32c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -151,7 +152,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * lock resource when switching status in merge process
    */
   private Lock oldMultiPassLock;
-  private AtomicInteger oldMultiPassCount = null;
+  private CountDownLatch oldMultiPassCount = null;
   private AtomicInteger newMultiPassCount = new AtomicInteger(0);
   /**
    * system recovery
@@ -757,14 +758,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       return true;
     } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
       // remove token first, then unlock
-      int oldMultiPassCountValue = oldMultiPassCount.decrementAndGet();
       oldMultiPassTokenSet.remove(token);
+      oldMultiPassCount.countDown();
+      long oldMultiPassCountValue = oldMultiPassCount.getCount();
       if (oldMultiPassCountValue < 0) {
         throw new FileNodeProcessorException(String
             .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
       }
       LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
-          oldMultiPassCount);
+          oldMultiPassCount.getCount());
       return true;
     } else {
       LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -1250,7 +1252,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     writeLock();
     try {
       oldMultiPassTokenSet = newMultiPassTokenSet;
-      oldMultiPassCount = newMultiPassCount;
+      oldMultiPassCount = new CountDownLatch(newMultiPassCount.get());
       oldMultiPassLock = new ReentrantLock(false);
       newMultiPassTokenSet = new HashSet<>();
       newMultiPassCount = new AtomicInteger(0);
@@ -1352,8 +1354,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
           oldMultiPassTokenSet,
           oldMultiPassCount);
+      try {
+        oldMultiPassCount.await();
+      } catch (InterruptedException e) {
+        LOGGER.info(
+            "The filenode processor {} encountered an error when it waits until all old queries are over.",
+            getProcessorName());
+        throw new FileNodeProcessorException(e);
+      }
       oldMultiPassLock.lock();
     }
+
     try {
       writeLock();
       try {
@@ -1710,11 +1721,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     if (oldMultiPassCount == null) {
       return true;
     }
-    if (oldMultiPassCount.get() == 0) {
+    if (oldMultiPassCount.getCount() == 0) {
       return true;
     } else {
       LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
-          getProcessorName(), oldMultiPassCount);
+          getProcessorName(), oldMultiPassCount.getCount());
       return false;
     }
   }