You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/25 11:30:08 UTC

[incubator-iotdb] branch refactor_query_resource_count updated: refactor query resource count

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch refactor_query_resource_count
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_query_resource_count by this push:
     new aec4b8e  refactor query resource count
aec4b8e is described below

commit aec4b8e4f8a563e9966316afb2e7388ccdaf966a
Author: lta <li...@163.com>
AuthorDate: Thu Apr 25 19:29:45 2019 +0800

    refactor query resource count
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  |   8 +-
 .../db/engine/filenode/FileNodeProcessor.java      | 136 ++++++++++++---------
 .../db/exception/FileNodeManagerException.java     |   4 +
 3 files changed, 88 insertions(+), 60 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index baf9fa6..2d65721 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -643,7 +643,7 @@ public class FileNodeManager implements IStatistic, IService {
     try {
       LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
           fileNodeProcessor.getProcessorName());
-      return fileNodeProcessor.addMultiPassLock();
+      return fileNodeProcessor.addMultiPassCount();
     } finally {
       fileNodeProcessor.writeUnlock();
     }
@@ -694,7 +694,11 @@ public class FileNodeManager implements IStatistic, IService {
     try {
       LOGGER.debug("Get the FileNodeProcessor: {} end query.",
           fileNodeProcessor.getProcessorName());
-      fileNodeProcessor.removeMultiPassLock(token);
+      fileNodeProcessor.decreaseMultiPassCount(token);
+    } catch (FileNodeProcessorException e) {
+      throw new FileNodeManagerException(String
+          .format("FileNodeProcessor of [%s] meets error when ending query",
+              fileNodeProcessor.getProcessorName()), e);
     } finally {
       fileNodeProcessor.writeUnlock();
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d97594c..b4920fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -40,10 +40,10 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -129,22 +129,37 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   private TsFileResource currentTsFileResource;
   private List<TsFileResource> newFileNodes;
   private FileNodeProcessorStatus isMerging;
-  // this is used when work->merge operation
+
+  /**
+   * this is used when work->merge operation
+   */
   private int numOfMergeFile;
   private FileNodeProcessorStore fileNodeProcessorStore;
   private String fileNodeRestoreFilePath;
   private final Object fileNodeRestoreLock = new Object();
-  // last merge time
+
+  /**
+   * last merge time
+   */
   private long lastMergeTime = -1;
   private BufferWriteProcessor bufferWriteProcessor = null;
   private OverflowProcessor overflowProcessor = null;
   private Set<Integer> oldMultiPassTokenSet = null;
   private Set<Integer> newMultiPassTokenSet = new HashSet<>();
-  private ReadWriteLock oldMultiPassLock = null;
-  private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
-  // system recovery
+
+  /**
+   * lock resource when switching status in merge process
+   */
+  private Lock oldMultiPassLock;
+  private AtomicInteger oldMultiPassCount = null;
+  private AtomicInteger newMultiPassCount = new AtomicInteger(0);
+  /**
+   * system recovery
+   */
   private boolean shouldRecovery = false;
-  // statistic monitor parameters
+  /**
+   * statistic monitor parameters
+   */
   private Map<String, Action> parameters;
   private FileSchema fileSchema;
   private Action flushFileNodeProcessorAction = () -> {
@@ -250,7 +265,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               "directory {}",
           getProcessorName(), restoreFolder.getAbsolutePath());
     }
-    fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX).getPath();
+    fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
+        .getPath();
     try {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
@@ -713,9 +729,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * add multiple pass lock.
    */
-  public int addMultiPassLock() {
-    LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
-    newMultiPassLock.readLock().lock();
+  public int addMultiPassCount() {
+    LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
+    newMultiPassCount.incrementAndGet();
     while (newMultiPassTokenSet.contains(multiPassLockToken)) {
       multiPassLockToken++;
     }
@@ -725,22 +741,30 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   /**
-   * remove multiple pass lock. TODO: use the return value or remove it.
+   * decrease multiple pass count. TODO: use the return value or remove it.
    */
-  public boolean removeMultiPassLock(int token) {
+  public boolean decreaseMultiPassCount(int token) throws FileNodeProcessorException {
     if (newMultiPassTokenSet.contains(token)) {
-      newMultiPassLock.readLock().unlock();
+      int newMultiPassCountValue = newMultiPassCount.decrementAndGet();
+      if (newMultiPassCountValue < 0) {
+        throw new FileNodeProcessorException(String
+            .format("Remove MultiPassCount error, newMultiPassCount:%d", newMultiPassCountValue));
+      }
       newMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
           getProcessorName(),
-          newMultiPassTokenSet, newMultiPassLock);
+          newMultiPassTokenSet, newMultiPassCount);
       return true;
     } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
       // remove token first, then unlock
-      oldMultiPassLock.readLock().unlock();
+      int oldMultiPassCountValue = oldMultiPassCount.decrementAndGet();
       oldMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
-          oldMultiPassLock);
+      if (oldMultiPassCountValue < 0) {
+        throw new FileNodeProcessorException(String
+            .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
+      }
+      LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
+          oldMultiPassCount);
       return true;
     } else {
       LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -754,7 +778,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * query data.
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
-         QueryContext context) throws FileNodeProcessorException {
+      QueryContext context) throws FileNodeProcessorException {
     // query overflow data
     MeasurementSchema mSchema;
     TSDataType dataType;
@@ -1226,9 +1250,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     writeLock();
     try {
       oldMultiPassTokenSet = newMultiPassTokenSet;
-      oldMultiPassLock = newMultiPassLock;
+      oldMultiPassCount = newMultiPassCount;
+      oldMultiPassLock = new ReentrantLock(false);
       newMultiPassTokenSet = new HashSet<>();
-      newMultiPassLock = new ReentrantReadWriteLock(false);
+      newMultiPassCount = new AtomicInteger(0);
       List<TsFileResource> result = new ArrayList<>();
       int beginIndex = 0;
       if (needEmpty) {
@@ -1323,11 +1348,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
         FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
 
-    if (oldMultiPassLock != null) {
-      LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
+    if (oldMultiPassCount != null) {
+      LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
           oldMultiPassTokenSet,
-          oldMultiPassLock);
-      oldMultiPassLock.writeLock().lock();
+          oldMultiPassCount);
+      oldMultiPassLock.lock();
     }
     try {
       writeLock();
@@ -1378,9 +1403,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     } finally {
       oldMultiPassTokenSet = null;
       if (oldMultiPassLock != null) {
-        oldMultiPassLock.writeLock().unlock();
+        oldMultiPassLock.unlock();
       }
-      oldMultiPassLock = null;
+      oldMultiPassCount = null;
     }
 
   }
@@ -1485,9 +1510,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
                   TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
 
-          for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
-            FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
-                false);
+          for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+              .getOverflowInsertFileList()) {
+            FileReaderManager.getInstance()
+                .increaseFileReaderReference(overflowInsertFile.getFilePath(),
+                    false);
           }
 
           IReader seriesReader = SeriesReaderFactory.getInstance()
@@ -1574,8 +1601,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         seriesWriterImpl.writeToFileWriter(mergeFileWriter);
       }
     } finally {
-      for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+      for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+          .getOverflowInsertFileList()) {
+        FileReaderManager.getInstance()
+            .decreaseFileReaderReference(overflowInsertFile.getFilePath(),
                 false);
       }
     }
@@ -1672,30 +1701,21 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           isMerging);
       return false;
     }
-    if (!newMultiPassLock.writeLock().tryLock()) {
-      LOGGER.info("The filenode {} can't be closed, because it can't get newMultiPassLock {}",
-          getProcessorName(), newMultiPassLock);
+    if (newMultiPassCount.get() != 0) {
+      LOGGER.info("The filenode {} can't be closed, because newMultiPassCount is {}",
+          getProcessorName(), newMultiPassCount);
       return false;
     }
 
-    try {
-      if (oldMultiPassLock == null) {
-        return true;
-      }
-      if (oldMultiPassLock.writeLock().tryLock()) {
-        try {
-          return true;
-        } finally {
-          oldMultiPassLock.writeLock().unlock();
-        }
-      } else {
-        LOGGER.info("The filenode {} can't be closed, because it can't get"
-                + " oldMultiPassLock {}",
-            getProcessorName(), oldMultiPassLock);
-        return false;
-      }
-    } finally {
-      newMultiPassLock.writeLock().unlock();
+    if (oldMultiPassCount == null) {
+      return true;
+    }
+    if (oldMultiPassCount.get() == 0) {
+      return true;
+    } else {
+      LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
+          getProcessorName(), oldMultiPassCount);
+      return false;
     }
   }
 
@@ -1991,8 +2011,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         Objects.equals(overflowProcessor, that.overflowProcessor) &&
         Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
         Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
-        Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
-        Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
+        Objects.equals(oldMultiPassCount, that.oldMultiPassCount) &&
+        Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
         Objects.equals(parameters, that.parameters) &&
         Objects.equals(fileSchema, that.fileSchema) &&
         Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
@@ -2008,7 +2028,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
         numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
         lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
-        newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+        newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
         fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
         bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
index 1e5e11d..0c15c50 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
@@ -26,6 +26,10 @@ public class FileNodeManagerException extends Exception {
     super();
   }
 
+  public FileNodeManagerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
   public FileNodeManagerException(String message) {
     super(message);
   }