You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 08:33:19 UTC
[incubator-iotdb] 02/03: add filenodeManager
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e89b1b64d7cdf8279f32497c56e3989795624c84
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 16:32:43 2019 +0800
add filenodeManager
---
.../iotdb/db/engine/UnsealedTsFileProcessorV2.java | 2 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 59 ++--
.../db/engine/filenodeV2/FileNodeManagerV2.java | 323 ++++++++++++++++++++-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 121 ++++++--
4 files changed, 443 insertions(+), 62 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index 7111873..5249560 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -162,7 +162,7 @@ public class UnsealedTsFileProcessorV2 {
*/
public void flushOneMemTable() throws IOException {
IMemTable memTableToFlush = flushingMemTables.pollFirst();
- // null memtable only appears when calling forceClose()
+ // null memtable only appears when calling asyncForceClose()
if (memTableToFlush != null) {
MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
this::releaseFlushedMemTable);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 5dfa089..9e42101 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -930,34 +930,6 @@ public class FileNodeManager implements IStatistic, IService {
}
/**
- * try to setCloseMark the filenode processor. The name of filenode processor is processorName
- */
- private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
- if (!processorMap.containsKey(processorName)) {
- return true;
- }
-
- Processor processor = processorMap.get(processorName);
- if (processor.tryWriteLock()) {
- try {
- if (processor.canBeClosed()) {
- processor.close();
- return true;
- } else {
- return false;
- }
- } catch (ProcessorException e) {
- LOGGER.error("Close the filenode processor {} error.", processorName, e);
- throw new FileNodeManagerException(e);
- } finally {
- processor.writeUnlock();
- }
- } else {
- return false;
- }
- }
-
- /**
* delete one filenode.
*/
public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
@@ -1096,8 +1068,39 @@ public class FileNodeManager implements IStatistic, IService {
}
}
+
+ /**
+ * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+ * notice: this method has the same function with close()
+ */
+ private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
+ if (!processorMap.containsKey(processorName)) {
+ return true;
+ }
+
+ Processor processor = processorMap.get(processorName);
+ if (processor.tryWriteLock()) {
+ try {
+ if (processor.canBeClosed()) {
+ processor.close();
+ return true;
+ } else {
+ return false;
+ }
+ } catch (ProcessorException e) {
+ LOGGER.error("Close the filenode processor {} error.", processorName, e);
+ throw new FileNodeManagerException(e);
+ } finally {
+ processor.writeUnlock();
+ }
+ } else {
+ return false;
+ }
+ }
+
/**
* try to setCloseMark the filenode processor.
+ * notice: This method has the same function with closeOneProcessor()
*/
private void close(String processorName) throws FileNodeManagerException {
if (!processorMap.containsKey(processorName)) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 3d0ad48..d5938dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -19,23 +19,45 @@
package org.apache.iotdb.db.engine.filenodeV2;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FileNodeManagerV2 implements IService {
+public class FileNodeManagerV2 implements IStatistic, IService {
private static final Logger LOGGER = LoggerFactory
.getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class);
@@ -52,7 +74,7 @@ public class FileNodeManagerV2 implements IService {
* This map is used to manage all filenode processor,<br> the key is filenode name which is
* storage group seriesPath.
*/
- private ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
+ private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
private static final FileNodeManagerV2 INSTANCE = new FileNodeManagerV2();
@@ -65,6 +87,11 @@ public class FileNodeManagerV2 implements IService {
*/
private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+ /**
+ * There is no need to add concurrently
+ **/
+ private HashMap<String, AtomicLong> statParamsHashMap;
+
private enum FileNodeManagerStatus {
NONE, MERGE, CLOSE
}
@@ -93,7 +120,11 @@ public class FileNodeManagerV2 implements IService {
@Override
public void stop() {
-
+ try {
+ syncCloseAllProcessor();
+ } catch (FileNodeManagerException e) {
+ LOGGER.error("Failed to setCloseMark file node manager because .", e);
+ }
}
@Override
@@ -103,7 +134,7 @@ public class FileNodeManagerV2 implements IService {
private FileNodeProcessorV2 getProcessor(String devicePath)
- throws FileNodeManagerException, FileNodeProcessorException {
+ throws FileNodeManagerException {
String filenodeName;
try {
// return the storage group name
@@ -121,8 +152,14 @@ public class FileNodeManagerV2 implements IService {
if (processor == null) {
LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
filenodeName, Thread.currentThread().getId());
- processor = new FileNodeProcessorV2(baseDir, filenodeName);
- processorMap.put(filenodeName, processor);
+ try {
+ processor = new FileNodeProcessorV2(baseDir, filenodeName);
+ } catch (FileNodeProcessorException e) {
+ throw new FileNodeManagerException(e);
+ }
+ synchronized (processorMap) {
+ processorMap.put(filenodeName, processor);
+ }
}
}
}
@@ -130,13 +167,92 @@ public class FileNodeManagerV2 implements IService {
}
+ private void updateStatHashMapWhenFail(TSRecord tsRecord) {
+ statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
+ .incrementAndGet();
+ statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
+ .addAndGet(tsRecord.dataPointList.size());
+ }
+
+ /**
+ * get stats parameter hash map.
+ *
+ * @return the key represents the params' name, values is AtomicLong type
+ */
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ return statParamsHashMap;
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ List<String> list = new ArrayList<>();
+ for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+ MonitorConstants.FileNodeManagerStatConstants.values()) {
+ list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + statConstant.name());
+ }
+ return list;
+ }
+
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ long curTime = System.currentTimeMillis();
+ TSRecord tsRecord = StatMonitor
+ .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+ curTime);
+ HashMap<String, TSRecord> ret = new HashMap<>();
+ ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
+ return ret;
+ }
+
+ /**
+ * Init Stat MetaDta.
+ */
+ @Override
+ public void registerStatMetadata() {
+ Map<String, String> hashMap = new HashMap<>();
+ for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+ MonitorConstants.FileNodeManagerStatConstants.values()) {
+ hashMap
+ .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+ + statConstant.name(), MonitorConstants.DATA_TYPE_INT64);
+ }
+ StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+ }
+
+ private void updateStat(boolean isMonitor, TSRecord tsRecord) {
+ if (!isMonitor) {
+ statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
+ .addAndGet(tsRecord.dataPointList.size());
+ }
+ }
+
+ /**
+ * This function is just for unit test.
+ */
+ public synchronized void resetFileNodeManager() {
+ for (String key : statParamsHashMap.keySet()) {
+ statParamsHashMap.put(key, new AtomicLong());
+ }
+ processorMap.clear();
+ }
+
+
+
/**
* insert TsRecord into storage group.
*
* @param tsRecord input Data
+ * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
+ * be recorded. if false, the statParamsHashMap will be updated.
* @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite
*/
- public boolean insert(TSRecord tsRecord) {
+ public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
+
+ checkTimestamp(tsRecord);
+
+ updateStat(isMonitor, tsRecord);
FileNodeProcessorV2 fileNodeProcessor;
try {
@@ -144,9 +260,10 @@ public class FileNodeManagerV2 implements IService {
} catch (Exception e) {
LOGGER.warn("get FileNodeProcessor of device {} failed, because {}", tsRecord.deviceId,
e.getMessage(), e);
- return false;
+ throw new FileNodeManagerException(e);
}
+ // TODO monitor: update statistics
return fileNodeProcessor.insert(tsRecord);
}
@@ -169,4 +286,194 @@ public class FileNodeManagerV2 implements IService {
}
}
+ private void checkTimestamp(TSRecord tsRecord) throws FileNodeManagerException {
+ if (tsRecord.time < 0) {
+ LOGGER.error("The insert time lt 0, {}.", tsRecord);
+ throw new FileNodeManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
+ }
+ }
+
+ /**
+ * recovery the filenode processor.
+ */
+ public void recovery() {
+ // TODO
+ }
+
+
+ private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
+ throws FileNodeManagerException {
+ // TODO
+ }
+
+
+ /**
+ * update data.
+ */
+ public void update(String deviceId, String measurementId, long startTime, long endTime,
+ TSDataType type, String v) {
+ // TODO
+ }
+
+ /**
+ * delete data.
+ */
+ public void delete(String deviceId, String measurementId, long timestamp)
+ throws FileNodeManagerException {
+ // TODO
+ }
+
+ private void delete(String processorName,
+ Iterator<Entry<String, FileNodeProcessor>> processorIterator)
+ throws FileNodeManagerException {
+ // TODO
+ }
+
+
+ /**
+ * begin query.
+ *
+ * @param deviceId queried deviceId
+ * @return a query token for the device.
+ */
+ public int beginQuery(String deviceId) throws FileNodeManagerException {
+ // TODO
+ return -1;
+ }
+
+ /**
+ * end query.
+ */
+ public void endQuery(String deviceId, int token) throws FileNodeManagerException {
+ // TODO
+ }
+
+ /**
+ * query data.
+ */
+ public QueryDataSourceV2 query(SingleSeriesExpression seriesExpression, QueryContext context)
+ throws FileNodeManagerException {
+ String deviceId = seriesExpression.getSeriesPath().getDevice();
+ String measurementId = seriesExpression.getSeriesPath().getMeasurement();
+ FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId);
+ LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
+ fileNodeProcessor.getStorageGroupName());
+ return fileNodeProcessor.query(deviceId, measurementId);
+ }
+
+ /**
+ * Append one specified tsfile to the storage group. <b>This method is only provided for
+ * transmission module</b>
+ *
+ * @param fileNodeName the seriesPath of storage group
+ * @param appendFile the appended tsfile information
+ */
+ public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile,
+ String appendFilePath) throws FileNodeManagerException {
+ // TODO
+ return true;
+ }
+
+ /**
+ * get all overlap tsfiles which are conflict with the appendFile.
+ *
+ * @param fileNodeName the seriesPath of storage group
+ * @param appendFile the appended tsfile information
+ */
+ public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile,
+ String uuid) throws FileNodeManagerException {
+ // TODO
+ return null;
+ }
+
+
+ /**
+ * merge all overflowed filenode.
+ *
+ * @throws FileNodeManagerException FileNodeManagerException
+ */
+ public void mergeAll() throws FileNodeManagerException {
+ // TODO
+ }
+
+ /**
+ * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+ */
+ private boolean tryToCloseFileNodeProcessor(String processorName) throws FileNodeManagerException {
+ // TODO
+ return false;
+ }
+
+ /**
+ * Force to setCloseMark the filenode processor.
+ */
+ public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
+ if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
+ return;
+ }
+
+ fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
+ try {
+ if (processorMap.containsKey(processorName)) {
+ deleteFileNodeBlocked(processorName);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Delete the filenode processor {} error.", processorName, e);
+ throw new FileNodeManagerException(e);
+ } finally {
+ fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+ }
+ }
+
+ private void deleteFileNodeBlocked(String processorName) throws IOException {
+ LOGGER.info("Forced to delete the filenode processor {}", processorName);
+ FileNodeProcessorV2 processor = processorMap.get(processorName);
+ processor.syncCloseFileNode(() -> {
+ String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
+ fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
+ try {
+ FileUtils.deleteDirectory(new File(fileNodePath));
+ } catch (IOException e) {
+ LOGGER.error("Delete tsfiles failed", e);
+ }
+ synchronized (processorMap) {
+ processorMap.remove(processorName);
+ }
+ return true;
+ });
+ }
+
+
+ /**
+ * add time series.
+ */
+ public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor,
+ Map<String, String> props) throws FileNodeManagerException {
+ FileNodeProcessorV2 fileNodeProcessor = getProcessor(path.getFullPath());
+ fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props);
+ }
+
+
+ /**
+ * delete all filenode.
+ */
+ public synchronized boolean deleteAll() throws FileNodeManagerException {
+ LOGGER.info("Start deleting all filenode");
+ // TODO
+ return true;
+ }
+
+ /**
+ * Sync asyncCloseOneProcessor all file node processors.
+ */
+ public void syncCloseAllProcessor() throws FileNodeManagerException {
+ LOGGER.info("Start closing all filenode processor");
+ synchronized (processorMap){
+ for(FileNodeProcessorV2 processor: processorMap.values()){
+ processor.asyncForceClose();
+ }
+ }
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 6dedfa8..ce8fbba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -23,12 +23,13 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
@@ -66,12 +67,12 @@ public class FileNodeProcessorV2 {
// includes sealed and unsealed sequnce tsfiles
private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
- private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
+ private UnsealedTsFileProcessorV2 workSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
// includes sealed and unsealed unsequnce tsfiles
private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
- private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
+ private UnsealedTsFileProcessorV2 workUnSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/**
@@ -88,12 +89,20 @@ public class FileNodeProcessorV2 {
private final ReadWriteLock lock;
+ private Condition closeFileNodeCondition;
+
+ /**
+ * Mark whether to close file node
+ */
+ private volatile boolean toBeClosed;
+
private VersionController versionController;
public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
throws FileNodeProcessorException {
this.storageGroupName = storageGroupName;
lock = new ReentrantReadWriteLock();
+ closeFileNodeCondition = lock.writeLock().newCondition();
File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
if (!storageGroupDir.exists()) {
@@ -160,29 +169,40 @@ public class FileNodeProcessorV2 {
}
}
- public boolean insert(TSRecord tsRecord) {
+ /**
+ *
+ * @param tsRecord
+ * @return -1: failed, 1: Overflow, 2:Bufferwrite
+ */
+ public int insert(TSRecord tsRecord) {
lock.writeLock().lock();
- boolean result;
+ int insertResult;
try {
+ if(toBeClosed){
+ return -1;
+ }
// init map
latestTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
latestFlushedTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+ boolean result;
// write to sequence or unsequence file
if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
- result = writeUnsealedDataFile(workUnsealedSequenceTsFileProcessor, tsRecord, true);
+ result = writeUnsealedDataFile(workSequenceTsFileProcessor, tsRecord, true);
+ insertResult = result ? 1 : -1;
} else {
- result = writeUnsealedDataFile(workUnsealedUnSequenceTsFileProcessor, tsRecord, false);
+ result = writeUnsealedDataFile(workUnSequenceTsFileProcessor, tsRecord, false);
+ insertResult = result ? 2 : -1;
}
} catch (Exception e) {
LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
- result = false;
+ insertResult = -1;
} finally {
lock.writeLock().unlock();
}
- return result;
+ return insertResult;
}
private boolean writeUnsealedDataFile(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
@@ -282,10 +302,10 @@ public class FileNodeProcessorV2 {
if (unsealedTsFileProcessor.shouldClose()) {
if (sequence) {
closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
- workUnsealedSequenceTsFileProcessor = null;
+ workSequenceTsFileProcessor = null;
} else {
closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
- workUnsealedUnSequenceTsFileProcessor = null;
+ workUnSequenceTsFileProcessor = null;
}
unsealedTsFileProcessor.setCloseMark();
}
@@ -304,29 +324,80 @@ public class FileNodeProcessorV2 {
*/
// TODO please consider concurrency with query and write method.
private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
- closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
- // end time with one start time
- Map<String, Long> endTimeMap = new HashMap<>();
- TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
- synchronized (resource) {
- for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+ lock.writeLock().unlock();
+ try {
+ closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
+ // end time with one start time
+ Map<String, Long> endTimeMap = new HashMap<>();
+ TsFileResourceV2 resource = workSequenceTsFileProcessor.getTsFileResource();
+ synchronized (resource) {
+ for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+ }
+ resource.setEndTimeMap(endTimeMap);
}
- resource.setEndTimeMap(endTimeMap);
+ closeFileNodeCondition.signal();
+ }finally {
+ lock.writeLock().unlock();
}
}
- public void forceClose() {
+ public void asyncForceClose() {
lock.writeLock().lock();
try {
- if (workUnsealedSequenceTsFileProcessor != null) {
- closingSequenceTsFileProcessor.add(workUnsealedSequenceTsFileProcessor);
- workUnsealedSequenceTsFileProcessor.forceClose();
- workUnsealedSequenceTsFileProcessor = null;
+ if (workSequenceTsFileProcessor != null) {
+ closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
+ workSequenceTsFileProcessor.forceClose();
+ workSequenceTsFileProcessor = null;
+ }
+ if (workUnSequenceTsFileProcessor != null) {
+ closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
+ workUnSequenceTsFileProcessor.forceClose();
+ workUnSequenceTsFileProcessor = null;
}
} finally {
lock.writeLock().unlock();
}
}
+
+ /**
+ * Block this method until this file node can be closed.
+ */
+ public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManager){
+ lock.writeLock().lock();
+ try {
+ asyncForceClose();
+ toBeClosed = true;
+ while (true) {
+ if (unSequenceFileList.isEmpty() && sequenceFileList.isEmpty()
+ && workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
+ removeProcessorFromManager.get();
+ break;
+ }
+ closeFileNodeCondition.await();
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("CloseFileNodeConditon occurs error while waiting for closing the file node {}",
+ storageGroupName, e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
+ return workSequenceTsFileProcessor;
+ }
+
+ public UnsealedTsFileProcessorV2 getWorkUnSequenceTsFileProcessor() {
+ return workUnSequenceTsFileProcessor;
+ }
+
+ public String getStorageGroupName() {
+ return storageGroupName;
+ }
+
+ public int getClosingProcessorSize(){
+ return unSequenceFileList.size() + sequenceFileList.size();
+ }
}