You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/07 15:28:29 UTC

[incubator-iotdb] branch master updated: Refactor query resource count (#168)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f554c7  Refactor query resource count (#168)
4f554c7 is described below

commit 4f554c746d81b5b127afd3e79850de04c88f56ab
Author: Tianan Li <li...@163.com>
AuthorDate: Tue May 7 23:28:25 2019 +0800

    Refactor query resource count (#168)
    
    * refactor query resource count
    
    * format code
    
    * fix a bug
    
    * remove lock
---
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  18 +--
 .../db/engine/filenode/FileNodeProcessor.java      | 156 ++++++++++++---------
 .../db/exception/FileNodeManagerException.java     |   4 +
 3 files changed, 106 insertions(+), 72 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 22778bb..516abdc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -190,10 +190,7 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   /**
-   *
    * @param filenodeName storage name, e.g., root.a.b
-   * @return
-   * @throws FileNodeManagerException
    */
   private FileNodeProcessor constructNewProcessor(String filenodeName)
       throws FileNodeManagerException {
@@ -429,7 +426,7 @@ public class FileNodeManager implements IStatistic, IService {
     fileNodeProcessor.setIntervalFileNodeStartTime(deviceId);
     fileNodeProcessor.setLastUpdateTime(deviceId, timestamp);
     try {
-      if(!bufferWriteProcessor.write(tsRecord)) {
+      if (!bufferWriteProcessor.write(tsRecord)) {
         // undo time update
         fileNodeProcessor.setIntervalFileNodeStartTime(deviceId, prevStartTime);
         fileNodeProcessor.setLastUpdateTime(deviceId, prevUpdateTime);
@@ -639,7 +636,8 @@ public class FileNodeManager implements IStatistic, IService {
 
   /**
    * begin query.
-   * @param  deviceId queried deviceId
+   *
+   * @param deviceId queried deviceId
    * @return a query token for the device.
    */
   public int beginQuery(String deviceId) throws FileNodeManagerException {
@@ -647,7 +645,7 @@ public class FileNodeManager implements IStatistic, IService {
     try {
       LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
           fileNodeProcessor.getProcessorName());
-      return fileNodeProcessor.addMultiPassLock();
+      return fileNodeProcessor.addMultiPassCount();
     } finally {
       fileNodeProcessor.writeUnlock();
     }
@@ -698,7 +696,10 @@ public class FileNodeManager implements IStatistic, IService {
     try {
       LOGGER.debug("Get the FileNodeProcessor: {} end query.",
           fileNodeProcessor.getProcessorName());
-      fileNodeProcessor.removeMultiPassLock(token);
+      fileNodeProcessor.decreaseMultiPassCount(token);
+    } catch (FileNodeProcessorException e) {
+      LOGGER.error("Failed to end query: the deviceId {}, token {}.", deviceId, token, e);
+      throw new FileNodeManagerException(e);
     } finally {
       fileNodeProcessor.writeUnlock();
     }
@@ -957,7 +958,8 @@ public class FileNodeManager implements IStatistic, IService {
   /**
    * add time series.
    */
-  public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor,
+  public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor,
       Map<String, String> props) throws FileNodeManagerException {
     FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true);
     try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 6428fc6..209c1f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -38,12 +38,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -129,22 +129,41 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   private TsFileResource currentTsFileResource;
   private List<TsFileResource> newFileNodes;
   private FileNodeProcessorStatus isMerging;
-  // this is used when work->merge operation
+
+  /**
+   * this is used when work->merge operation
+   */
   private int numOfMergeFile;
   private FileNodeProcessorStore fileNodeProcessorStore;
   private String fileNodeRestoreFilePath;
   private final Object fileNodeRestoreLock = new Object();
-  // last merge time
+
+  /**
+   * last merge time
+   */
   private long lastMergeTime = -1;
   private BufferWriteProcessor bufferWriteProcessor = null;
   private OverflowProcessor overflowProcessor = null;
   private Set<Integer> oldMultiPassTokenSet = null;
   private Set<Integer> newMultiPassTokenSet = new HashSet<>();
-  private ReadWriteLock oldMultiPassLock = null;
-  private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
-  // system recovery
+
+  /**
+   * Represent the number of old queries that have not ended.
+   * This parameter only decreases but not increase.
+   */
+  private CountDownLatch oldMultiPassCount = null;
+
+  /**
+   * Represent the number of new queries that have not ended.
+   */
+  private AtomicInteger newMultiPassCount = new AtomicInteger(0);
+  /**
+   * system recovery
+   */
   private boolean shouldRecovery = false;
-  // statistic monitor parameters
+  /**
+   * statistic monitor parameters
+   */
   private Map<String, Action> parameters;
   private FileSchema fileSchema;
   private Action flushFileNodeProcessorAction = () -> {
@@ -250,7 +269,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               "directory {}",
           getProcessorName(), restoreFolder.getAbsolutePath());
     }
-    fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX).getPath();
+    fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
+        .getPath();
     try {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
@@ -518,9 +538,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       } catch (BufferWriteProcessorException e) {
         throw new FileNodeProcessorException(String
             .format("The filenode processor %s failed to get the bufferwrite processor.",
-                processorName),e);
+                processorName), e);
       }
-    } else if (bufferWriteProcessor.isClosed()){
+    } else if (bufferWriteProcessor.isClosed()) {
       try {
         bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
             + System.currentTimeMillis());
@@ -543,7 +563,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
           versionController);
-    } else if (overflowProcessor.isClosed()){
+    } else if (overflowProcessor.isClosed()) {
       overflowProcessor.reopen();
     }
     return overflowProcessor;
@@ -704,9 +724,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * add multiple pass lock.
    */
-  public int addMultiPassLock() {
-    LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
-    newMultiPassLock.readLock().lock();
+  public int addMultiPassCount() {
+    LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
+    newMultiPassCount.incrementAndGet();
     while (newMultiPassTokenSet.contains(multiPassLockToken)) {
       multiPassLockToken++;
     }
@@ -716,22 +736,31 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   /**
-   * remove multiple pass lock. TODO: use the return value or remove it.
+   * decrease multiple pass count. TODO: use the return value or remove it.
    */
-  public boolean removeMultiPassLock(int token) {
+  public boolean decreaseMultiPassCount(int token) throws FileNodeProcessorException {
     if (newMultiPassTokenSet.contains(token)) {
-      newMultiPassLock.readLock().unlock();
+      int newMultiPassCountValue = newMultiPassCount.decrementAndGet();
+      if (newMultiPassCountValue < 0) {
+        throw new FileNodeProcessorException(String
+            .format("Remove MultiPassCount error, newMultiPassCount:%d", newMultiPassCountValue));
+      }
       newMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
           getProcessorName(),
-          newMultiPassTokenSet, newMultiPassLock);
+          newMultiPassTokenSet, newMultiPassCount);
       return true;
     } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
       // remove token first, then unlock
-      oldMultiPassLock.readLock().unlock();
       oldMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
-          oldMultiPassLock);
+      oldMultiPassCount.countDown();
+      long oldMultiPassCountValue = oldMultiPassCount.getCount();
+      if (oldMultiPassCountValue < 0) {
+        throw new FileNodeProcessorException(String
+            .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
+      }
+      LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
+          oldMultiPassCount.getCount());
       return true;
     } else {
       LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -745,7 +774,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * query data.
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
-         QueryContext context) throws FileNodeProcessorException {
+      QueryContext context) throws FileNodeProcessorException {
     // query overflow data
     MeasurementSchema mSchema;
     TSDataType dataType;
@@ -1221,9 +1250,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     writeLock();
     try {
       oldMultiPassTokenSet = newMultiPassTokenSet;
-      oldMultiPassLock = newMultiPassLock;
+      oldMultiPassCount = new CountDownLatch(newMultiPassCount.get());
       newMultiPassTokenSet = new HashSet<>();
-      newMultiPassLock = new ReentrantReadWriteLock(false);
+      newMultiPassCount = new AtomicInteger(0);
       List<TsFileResource> result = new ArrayList<>();
       int beginIndex = 0;
       if (needEmpty) {
@@ -1318,12 +1347,20 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
         FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
 
-    if (oldMultiPassLock != null) {
-      LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
+    if (oldMultiPassCount != null) {
+      LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
           oldMultiPassTokenSet,
-          oldMultiPassLock);
-      oldMultiPassLock.writeLock().lock();
+          oldMultiPassCount);
+      try {
+        oldMultiPassCount.await();
+      } catch (InterruptedException e) {
+        LOGGER.info(
+            "The filenode processor {} encountered an error when it waits for all old queries over.",
+            getProcessorName());
+        throw new FileNodeProcessorException(e);
+      }
     }
+
     try {
       writeLock();
       try {
@@ -1372,10 +1409,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
     } finally {
       oldMultiPassTokenSet = null;
-      if (oldMultiPassLock != null) {
-        oldMultiPassLock.writeLock().unlock();
-      }
-      oldMultiPassLock = null;
+      oldMultiPassCount = null;
     }
 
   }
@@ -1478,9 +1512,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
                   TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
 
-          for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
-            FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
-                false);
+          for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+              .getOverflowInsertFileList()) {
+            FileReaderManager.getInstance()
+                .increaseFileReaderReference(overflowInsertFile.getFilePath(),
+                    false);
           }
 
           IReader seriesReader = SeriesReaderFactory.getInstance()
@@ -1566,8 +1602,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         seriesWriterImpl.writeToFileWriter(mergeFileWriter);
       }
     } finally {
-      for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
-        FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+      for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+          .getOverflowInsertFileList()) {
+        FileReaderManager.getInstance()
+            .decreaseFileReaderReference(overflowInsertFile.getFilePath(),
                 false);
       }
     }
@@ -1664,31 +1702,21 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           isMerging);
       return false;
     }
-    if (!newMultiPassLock.writeLock().tryLock()) {
-      LOGGER.warn(
-          "The filenode {} can't be closed, because it can't get newMultiPassLock {}. The newMultiPassTokenSet is {}",
-          getProcessorName(), newMultiPassLock, newMultiPassTokenSet);
+    if (newMultiPassCount.get() != 0) {
+      LOGGER.warn("The filenode {} can't be closed, because newMultiPassCount is {}. The newMultiPassTokenSet is {}",
+          getProcessorName(), newMultiPassCount, newMultiPassTokenSet);
       return false;
     }
 
-    try {
-      if (oldMultiPassLock == null) {
-        return true;
-      }
-      if (oldMultiPassLock.writeLock().tryLock()) {
-        try {
-          return true;
-        } finally {
-          oldMultiPassLock.writeLock().unlock();
-        }
-      } else {
-        LOGGER.info("The filenode {} can't be closed, because it can't get"
-                + " oldMultiPassLock {}",
-            getProcessorName(), oldMultiPassLock);
-        return false;
-      }
-    } finally {
-      newMultiPassLock.writeLock().unlock();
+    if (oldMultiPassCount == null) {
+      return true;
+    }
+    if (oldMultiPassCount.getCount() == 0) {
+      return true;
+    } else {
+      LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
+          getProcessorName(), oldMultiPassCount.getCount());
+      return false;
     }
   }
 
@@ -1988,8 +2016,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         Objects.equals(overflowProcessor, that.overflowProcessor) &&
         Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
         Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
-        Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
-        Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
+        Objects.equals(oldMultiPassCount, that.oldMultiPassCount) &&
+        Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
         Objects.equals(parameters, that.parameters) &&
         Objects.equals(fileSchema, that.fileSchema) &&
         Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
@@ -2005,7 +2033,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
         numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
         lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
-        newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+        newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
         fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
         bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
index 1e5e11d..0c15c50 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
@@ -26,6 +26,10 @@ public class FileNodeManagerException extends Exception {
     super();
   }
 
+  public FileNodeManagerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
   public FileNodeManagerException(String message) {
     super(message);
   }