You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/25 11:30:08 UTC
[incubator-iotdb] branch refactor_query_resource_count updated:
refactor query resource count
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch refactor_query_resource_count
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_query_resource_count by this push:
new aec4b8e refactor query resource count
aec4b8e is described below
commit aec4b8e4f8a563e9966316afb2e7388ccdaf966a
Author: lta <li...@163.com>
AuthorDate: Thu Apr 25 19:29:45 2019 +0800
refactor query resource count
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 8 +-
.../db/engine/filenode/FileNodeProcessor.java | 136 ++++++++++++---------
.../db/exception/FileNodeManagerException.java | 4 +
3 files changed, 88 insertions(+), 60 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index baf9fa6..2d65721 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -643,7 +643,7 @@ public class FileNodeManager implements IStatistic, IService {
try {
LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
fileNodeProcessor.getProcessorName());
- return fileNodeProcessor.addMultiPassLock();
+ return fileNodeProcessor.addMultiPassCount();
} finally {
fileNodeProcessor.writeUnlock();
}
@@ -694,7 +694,11 @@ public class FileNodeManager implements IStatistic, IService {
try {
LOGGER.debug("Get the FileNodeProcessor: {} end query.",
fileNodeProcessor.getProcessorName());
- fileNodeProcessor.removeMultiPassLock(token);
+ fileNodeProcessor.decreaseMultiPassCount(token);
+ } catch (FileNodeProcessorException e) {
+ throw new FileNodeManagerException(String
+ .format("FileNodeProcessor of [%s] meets error when ending query",
+ fileNodeProcessor.getProcessorName()), e);
} finally {
fileNodeProcessor.writeUnlock();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index d97594c..b4920fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -40,10 +40,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -129,22 +129,37 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private TsFileResource currentTsFileResource;
private List<TsFileResource> newFileNodes;
private FileNodeProcessorStatus isMerging;
- // this is used when work->merge operation
+
+ /**
+ * this is used when work->merge operation
+ */
private int numOfMergeFile;
private FileNodeProcessorStore fileNodeProcessorStore;
private String fileNodeRestoreFilePath;
private final Object fileNodeRestoreLock = new Object();
- // last merge time
+
+ /**
+ * last merge time
+ */
private long lastMergeTime = -1;
private BufferWriteProcessor bufferWriteProcessor = null;
private OverflowProcessor overflowProcessor = null;
private Set<Integer> oldMultiPassTokenSet = null;
private Set<Integer> newMultiPassTokenSet = new HashSet<>();
- private ReadWriteLock oldMultiPassLock = null;
- private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
- // system recovery
+
+ /**
+ * lock resource when switching status in merge process
+ */
+ private Lock oldMultiPassLock;
+ private AtomicInteger oldMultiPassCount = null;
+ private AtomicInteger newMultiPassCount = new AtomicInteger(0);
+ /**
+ * system recovery
+ */
private boolean shouldRecovery = false;
- // statistic monitor parameters
+ /**
+ * statistic monitor parameters
+ */
private Map<String, Action> parameters;
private FileSchema fileSchema;
private Action flushFileNodeProcessorAction = () -> {
@@ -250,7 +265,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
"directory {}",
getProcessorName(), restoreFolder.getAbsolutePath());
}
- fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX).getPath();
+ fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
+ .getPath();
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
@@ -713,9 +729,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* add multiple pass lock.
*/
- public int addMultiPassLock() {
- LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
- newMultiPassLock.readLock().lock();
+ public int addMultiPassCount() {
+ LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
+ newMultiPassCount.incrementAndGet();
while (newMultiPassTokenSet.contains(multiPassLockToken)) {
multiPassLockToken++;
}
@@ -725,22 +741,30 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
/**
- * remove multiple pass lock. TODO: use the return value or remove it.
+ * decrease multiple pass count. TODO: use the return value or remove it.
*/
- public boolean removeMultiPassLock(int token) {
+ public boolean decreaseMultiPassCount(int token) throws FileNodeProcessorException {
if (newMultiPassTokenSet.contains(token)) {
- newMultiPassLock.readLock().unlock();
+ int newMultiPassCountValue = newMultiPassCount.decrementAndGet();
+ if (newMultiPassCountValue < 0) {
+ throw new FileNodeProcessorException(String
+ .format("Remove MultiPassCount error, newMultiPassCount:%d", newMultiPassCountValue));
+ }
newMultiPassTokenSet.remove(token);
- LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+ LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
getProcessorName(),
- newMultiPassTokenSet, newMultiPassLock);
+ newMultiPassTokenSet, newMultiPassCount);
return true;
} else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
- oldMultiPassLock.readLock().unlock();
+ int oldMultiPassCountValue = oldMultiPassCount.decrementAndGet();
oldMultiPassTokenSet.remove(token);
- LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
- oldMultiPassLock);
+ if (oldMultiPassCountValue < 0) {
+ throw new FileNodeProcessorException(String
+ .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
+ }
+ LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
+ oldMultiPassCount);
return true;
} else {
LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -754,7 +778,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
- QueryContext context) throws FileNodeProcessorException {
+ QueryContext context) throws FileNodeProcessorException {
// query overflow data
MeasurementSchema mSchema;
TSDataType dataType;
@@ -1226,9 +1250,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeLock();
try {
oldMultiPassTokenSet = newMultiPassTokenSet;
- oldMultiPassLock = newMultiPassLock;
+ oldMultiPassCount = newMultiPassCount;
+ oldMultiPassLock = new ReentrantLock(false);
newMultiPassTokenSet = new HashSet<>();
- newMultiPassLock = new ReentrantReadWriteLock(false);
+ newMultiPassCount = new AtomicInteger(0);
List<TsFileResource> result = new ArrayList<>();
int beginIndex = 0;
if (needEmpty) {
@@ -1323,11 +1348,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
- if (oldMultiPassLock != null) {
- LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
+ if (oldMultiPassCount != null) {
+ LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
oldMultiPassTokenSet,
- oldMultiPassLock);
- oldMultiPassLock.writeLock().lock();
+ oldMultiPassCount);
+ oldMultiPassLock.lock();
}
try {
writeLock();
@@ -1378,9 +1403,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
} finally {
oldMultiPassTokenSet = null;
if (oldMultiPassLock != null) {
- oldMultiPassLock.writeLock().unlock();
+ oldMultiPassLock.unlock();
}
- oldMultiPassLock = null;
+ oldMultiPassCount = null;
}
}
@@ -1485,9 +1510,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
- FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
- false);
+ for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+ .getOverflowInsertFileList()) {
+ FileReaderManager.getInstance()
+ .increaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
}
IReader seriesReader = SeriesReaderFactory.getInstance()
@@ -1574,8 +1601,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
seriesWriterImpl.writeToFileWriter(mergeFileWriter);
}
} finally {
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
- FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+ for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+ .getOverflowInsertFileList()) {
+ FileReaderManager.getInstance()
+ .decreaseFileReaderReference(overflowInsertFile.getFilePath(),
false);
}
}
@@ -1672,30 +1701,21 @@ public class FileNodeProcessor extends Processor implements IStatistic {
isMerging);
return false;
}
- if (!newMultiPassLock.writeLock().tryLock()) {
- LOGGER.info("The filenode {} can't be closed, because it can't get newMultiPassLock {}",
- getProcessorName(), newMultiPassLock);
+ if (newMultiPassCount.get() != 0) {
+ LOGGER.info("The filenode {} can't be closed, because newMultiPassCount is {}",
+ getProcessorName(), newMultiPassCount);
return false;
}
- try {
- if (oldMultiPassLock == null) {
- return true;
- }
- if (oldMultiPassLock.writeLock().tryLock()) {
- try {
- return true;
- } finally {
- oldMultiPassLock.writeLock().unlock();
- }
- } else {
- LOGGER.info("The filenode {} can't be closed, because it can't get"
- + " oldMultiPassLock {}",
- getProcessorName(), oldMultiPassLock);
- return false;
- }
- } finally {
- newMultiPassLock.writeLock().unlock();
+ if (oldMultiPassCount == null) {
+ return true;
+ }
+ if (oldMultiPassCount.get() == 0) {
+ return true;
+ } else {
+ LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
+ getProcessorName(), oldMultiPassCount);
+ return false;
}
}
@@ -1991,8 +2011,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Objects.equals(overflowProcessor, that.overflowProcessor) &&
Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
- Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
- Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
+ Objects.equals(oldMultiPassCount, that.oldMultiPassCount) &&
+ Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
Objects.equals(parameters, that.parameters) &&
Objects.equals(fileSchema, that.fileSchema) &&
Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
@@ -2008,7 +2028,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
- newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+ newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
index 1e5e11d..0c15c50 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
@@ -26,6 +26,10 @@ public class FileNodeManagerException extends Exception {
super();
}
+ public FileNodeManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
public FileNodeManagerException(String message) {
super(message);
}