You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/07 15:28:29 UTC
[incubator-iotdb] branch master updated: Refactor query resource
count (#168)
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4f554c7 Refactor query resource count (#168)
4f554c7 is described below
commit 4f554c746d81b5b127afd3e79850de04c88f56ab
Author: Tianan Li <li...@163.com>
AuthorDate: Tue May 7 23:28:25 2019 +0800
Refactor query resource count (#168)
* refactor query resource count
* format code
* fix a bug
* remove lock
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 18 +--
.../db/engine/filenode/FileNodeProcessor.java | 156 ++++++++++++---------
.../db/exception/FileNodeManagerException.java | 4 +
3 files changed, 106 insertions(+), 72 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 22778bb..516abdc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -190,10 +190,7 @@ public class FileNodeManager implements IStatistic, IService {
}
/**
- *
* @param filenodeName storage name, e.g., root.a.b
- * @return
- * @throws FileNodeManagerException
*/
private FileNodeProcessor constructNewProcessor(String filenodeName)
throws FileNodeManagerException {
@@ -429,7 +426,7 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeProcessor.setIntervalFileNodeStartTime(deviceId);
fileNodeProcessor.setLastUpdateTime(deviceId, timestamp);
try {
- if(!bufferWriteProcessor.write(tsRecord)) {
+ if (!bufferWriteProcessor.write(tsRecord)) {
// undo time update
fileNodeProcessor.setIntervalFileNodeStartTime(deviceId, prevStartTime);
fileNodeProcessor.setLastUpdateTime(deviceId, prevUpdateTime);
@@ -639,7 +636,8 @@ public class FileNodeManager implements IStatistic, IService {
/**
* begin query.
- * @param deviceId queried deviceId
+ *
+ * @param deviceId queried deviceId
* @return a query token for the device.
*/
public int beginQuery(String deviceId) throws FileNodeManagerException {
@@ -647,7 +645,7 @@ public class FileNodeManager implements IStatistic, IService {
try {
LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
fileNodeProcessor.getProcessorName());
- return fileNodeProcessor.addMultiPassLock();
+ return fileNodeProcessor.addMultiPassCount();
} finally {
fileNodeProcessor.writeUnlock();
}
@@ -698,7 +696,10 @@ public class FileNodeManager implements IStatistic, IService {
try {
LOGGER.debug("Get the FileNodeProcessor: {} end query.",
fileNodeProcessor.getProcessorName());
- fileNodeProcessor.removeMultiPassLock(token);
+ fileNodeProcessor.decreaseMultiPassCount(token);
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error("Failed to end query: the deviceId {}, token {}.", deviceId, token, e);
+ throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
}
@@ -957,7 +958,8 @@ public class FileNodeManager implements IStatistic, IService {
/**
* add time series.
*/
- public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor,
+ public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor,
Map<String, String> props) throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true);
try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 6428fc6..209c1f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -38,12 +38,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -129,22 +129,41 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private TsFileResource currentTsFileResource;
private List<TsFileResource> newFileNodes;
private FileNodeProcessorStatus isMerging;
- // this is used when work->merge operation
+
+ /**
+ * this is used when work->merge operation
+ */
private int numOfMergeFile;
private FileNodeProcessorStore fileNodeProcessorStore;
private String fileNodeRestoreFilePath;
private final Object fileNodeRestoreLock = new Object();
- // last merge time
+
+ /**
+ * last merge time
+ */
private long lastMergeTime = -1;
private BufferWriteProcessor bufferWriteProcessor = null;
private OverflowProcessor overflowProcessor = null;
private Set<Integer> oldMultiPassTokenSet = null;
private Set<Integer> newMultiPassTokenSet = new HashSet<>();
- private ReadWriteLock oldMultiPassLock = null;
- private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
- // system recovery
+
+ /**
+ * Represent the number of old queries that have not ended.
+ * This parameter only decreases but not increase.
+ */
+ private CountDownLatch oldMultiPassCount = null;
+
+ /**
+ * Represent the number of new queries that have not ended.
+ */
+ private AtomicInteger newMultiPassCount = new AtomicInteger(0);
+ /**
+ * system recovery
+ */
private boolean shouldRecovery = false;
- // statistic monitor parameters
+ /**
+ * statistic monitor parameters
+ */
private Map<String, Action> parameters;
private FileSchema fileSchema;
private Action flushFileNodeProcessorAction = () -> {
@@ -250,7 +269,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
"directory {}",
getProcessorName(), restoreFolder.getAbsolutePath());
}
- fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX).getPath();
+ fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
+ .getPath();
try {
fileNodeProcessorStore = readStoreFromDisk();
} catch (FileNodeProcessorException e) {
@@ -518,9 +538,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
} catch (BufferWriteProcessorException e) {
throw new FileNodeProcessorException(String
.format("The filenode processor %s failed to get the bufferwrite processor.",
- processorName),e);
+ processorName), e);
}
- } else if (bufferWriteProcessor.isClosed()){
+ } else if (bufferWriteProcessor.isClosed()) {
try {
bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ System.currentTimeMillis());
@@ -543,7 +563,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
versionController);
- } else if (overflowProcessor.isClosed()){
+ } else if (overflowProcessor.isClosed()) {
overflowProcessor.reopen();
}
return overflowProcessor;
@@ -704,9 +724,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* add multiple pass lock.
*/
- public int addMultiPassLock() {
- LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
- newMultiPassLock.readLock().lock();
+ public int addMultiPassCount() {
+ LOGGER.debug("Add MultiPassCount: read lock newMultiPassCount.");
+ newMultiPassCount.incrementAndGet();
while (newMultiPassTokenSet.contains(multiPassLockToken)) {
multiPassLockToken++;
}
@@ -716,22 +736,31 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
/**
- * remove multiple pass lock. TODO: use the return value or remove it.
+ * decrease multiple pass count. TODO: use the return value or remove it.
*/
- public boolean removeMultiPassLock(int token) {
+ public boolean decreaseMultiPassCount(int token) throws FileNodeProcessorException {
if (newMultiPassTokenSet.contains(token)) {
- newMultiPassLock.readLock().unlock();
+ int newMultiPassCountValue = newMultiPassCount.decrementAndGet();
+ if (newMultiPassCountValue < 0) {
+ throw new FileNodeProcessorException(String
+ .format("Remove MultiPassCount error, newMultiPassCount:%d", newMultiPassCountValue));
+ }
newMultiPassTokenSet.remove(token);
- LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+ LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, count:{}", token,
getProcessorName(),
- newMultiPassTokenSet, newMultiPassLock);
+ newMultiPassTokenSet, newMultiPassCount);
return true;
} else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
// remove token first, then unlock
- oldMultiPassLock.readLock().unlock();
oldMultiPassTokenSet.remove(token);
- LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
- oldMultiPassLock);
+ oldMultiPassCount.countDown();
+ long oldMultiPassCountValue = oldMultiPassCount.getCount();
+ if (oldMultiPassCountValue < 0) {
+ throw new FileNodeProcessorException(String
+ .format("Remove MultiPassCount error, oldMultiPassCount:%d", oldMultiPassCountValue));
+ }
+ LOGGER.debug("Remove multi token:{}, old set:{}, count:{}", token, oldMultiPassTokenSet,
+ oldMultiPassCount.getCount());
return true;
} else {
LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
@@ -745,7 +774,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* query data.
*/
public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
- QueryContext context) throws FileNodeProcessorException {
+ QueryContext context) throws FileNodeProcessorException {
// query overflow data
MeasurementSchema mSchema;
TSDataType dataType;
@@ -1221,9 +1250,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
writeLock();
try {
oldMultiPassTokenSet = newMultiPassTokenSet;
- oldMultiPassLock = newMultiPassLock;
+ oldMultiPassCount = new CountDownLatch(newMultiPassCount.get());
newMultiPassTokenSet = new HashSet<>();
- newMultiPassLock = new ReentrantReadWriteLock(false);
+ newMultiPassCount = new AtomicInteger(0);
List<TsFileResource> result = new ArrayList<>();
int beginIndex = 0;
if (needEmpty) {
@@ -1318,12 +1347,20 @@ public class FileNodeProcessor extends Processor implements IStatistic {
LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
- if (oldMultiPassLock != null) {
- LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
+ if (oldMultiPassCount != null) {
+ LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Count is {}",
oldMultiPassTokenSet,
- oldMultiPassLock);
- oldMultiPassLock.writeLock().lock();
+ oldMultiPassCount);
+ try {
+ oldMultiPassCount.await();
+ } catch (InterruptedException e) {
+ LOGGER.info(
+ "The filenode processor {} encountered an error when it waits for all old queries over.",
+ getProcessorName());
+ throw new FileNodeProcessorException(e);
+ }
}
+
try {
writeLock();
try {
@@ -1372,10 +1409,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
} finally {
oldMultiPassTokenSet = null;
- if (oldMultiPassLock != null) {
- oldMultiPassLock.writeLock().unlock();
- }
- oldMultiPassLock = null;
+ oldMultiPassCount = null;
}
}
@@ -1478,9 +1512,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
- FileReaderManager.getInstance().increaseFileReaderReference(overflowInsertFile.getFilePath(),
- false);
+ for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+ .getOverflowInsertFileList()) {
+ FileReaderManager.getInstance()
+ .increaseFileReaderReference(overflowInsertFile.getFilePath(),
+ false);
}
IReader seriesReader = SeriesReaderFactory.getInstance()
@@ -1566,8 +1602,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
seriesWriterImpl.writeToFileWriter(mergeFileWriter);
}
} finally {
- for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource.getOverflowInsertFileList()) {
- FileReaderManager.getInstance().decreaseFileReaderReference(overflowInsertFile.getFilePath(),
+ for (OverflowInsertFile overflowInsertFile : overflowSeriesDataSource
+ .getOverflowInsertFileList()) {
+ FileReaderManager.getInstance()
+ .decreaseFileReaderReference(overflowInsertFile.getFilePath(),
false);
}
}
@@ -1664,31 +1702,21 @@ public class FileNodeProcessor extends Processor implements IStatistic {
isMerging);
return false;
}
- if (!newMultiPassLock.writeLock().tryLock()) {
- LOGGER.warn(
- "The filenode {} can't be closed, because it can't get newMultiPassLock {}. The newMultiPassTokenSet is {}",
- getProcessorName(), newMultiPassLock, newMultiPassTokenSet);
+ if (newMultiPassCount.get() != 0) {
+ LOGGER.warn("The filenode {} can't be closed, because newMultiPassCount is {}. The newMultiPassTokenSet is {}",
+ getProcessorName(), newMultiPassCount, newMultiPassTokenSet);
return false;
}
- try {
- if (oldMultiPassLock == null) {
- return true;
- }
- if (oldMultiPassLock.writeLock().tryLock()) {
- try {
- return true;
- } finally {
- oldMultiPassLock.writeLock().unlock();
- }
- } else {
- LOGGER.info("The filenode {} can't be closed, because it can't get"
- + " oldMultiPassLock {}",
- getProcessorName(), oldMultiPassLock);
- return false;
- }
- } finally {
- newMultiPassLock.writeLock().unlock();
+ if (oldMultiPassCount == null) {
+ return true;
+ }
+ if (oldMultiPassCount.getCount() == 0) {
+ return true;
+ } else {
+ LOGGER.info("The filenode {} can't be closed, because oldMultiPassCount is {}",
+ getProcessorName(), oldMultiPassCount.getCount());
+ return false;
}
}
@@ -1988,8 +2016,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Objects.equals(overflowProcessor, that.overflowProcessor) &&
Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
- Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
- Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
+ Objects.equals(oldMultiPassCount, that.oldMultiPassCount) &&
+ Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
Objects.equals(parameters, that.parameters) &&
Objects.equals(fileSchema, that.fileSchema) &&
Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
@@ -2005,7 +2033,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging,
numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
- newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters,
+ newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
index 1e5e11d..0c15c50 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeManagerException.java
@@ -26,6 +26,10 @@ public class FileNodeManagerException extends Exception {
super();
}
+ public FileNodeManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
public FileNodeManagerException(String message) {
super(message);
}