You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/04/18 14:27:09 UTC
[incubator-iotdb] branch add_disabled_mem_control updated: reuse
BufferWriteProcessor and OverflowProcessor to reduce memory waste.
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch add_disabled_mem_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/add_disabled_mem_control by this push:
new 2b5016f reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
2b5016f is described below
commit 2b5016f5c2f8078a600e0bfe491df75b6d2d4d7b
Author: 江天 <jt...@163.com>
AuthorDate: Thu Apr 18 22:25:01 2019 +0800
reuse BufferWriteProcessor and OverflowProcessor to reduce memory waste.
---
.../engine/bufferwrite/BufferWriteProcessor.java | 133 +++++++++++----------
.../db/engine/filenode/FileNodeProcessor.java | 70 +++++------
.../db/engine/overflow/io/OverflowProcessor.java | 74 ++++++++++--
.../db/exception/FileNodeProcessorException.java | 4 +
4 files changed, 167 insertions(+), 114 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2bc394b..a0932ee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -84,6 +84,9 @@ public class BufferWriteProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
+ private boolean isClosed = false;
+ private boolean isFlush = false;
+
/**
* constructor of BufferWriteProcessor.
*
@@ -100,13 +103,37 @@ public class BufferWriteProcessor extends Processor {
super(processorName);
this.fileSchema = fileSchema;
this.baseDir = baseDir;
- this.fileName = fileName;
+ bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+ bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+ filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ try {
+ logNode = MultiFileLogNodeManager.getInstance().getNode(
+ processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
+ getBufferwriteRestoreFilePath(),
+ FileNodeManager.getInstance().getRestoreFilePath(processorName));
+ } catch (IOException e) {
+ throw new BufferWriteProcessorException(e);
+ }
+ }
+ this.versionController = versionController;
+
+ reopen(fileName);
+ }
+
+ public void reopen(String fileName) throws BufferWriteProcessorException {
+ if (!isClosed) {
+ return;
+ }
+ this.fileName = fileName;
String bDir = baseDir;
if (bDir.length() > 0 && bDir.charAt(bDir.length() - 1) != File.separatorChar) {
bDir = bDir + File.separatorChar;
}
- String dataDirPath = bDir + processorName;
+ String dataDirPath = bDir + getProcessorName();
File dataDir = new File(dataDirPath);
if (!dataDir.exists()) {
dataDir.mkdirs();
@@ -114,29 +141,23 @@ public class BufferWriteProcessor extends Processor {
dataDirPath);
}
this.insertFilePath = new File(dataDir, fileName).getPath();
- bufferWriteRelativePath = processorName + File.separatorChar + fileName;
+ bufferWriteRelativePath = getProcessorName() + File.separatorChar + fileName;
try {
- writer = new RestorableTsFileIOWriter(processorName, insertFilePath);
+ writer = new RestorableTsFileIOWriter(getProcessorName(), insertFilePath);
} catch (IOException e) {
throw new BufferWriteProcessorException(e);
}
+ if (workMemTable == null) {
+ workMemTable = new PrimitiveMemTable();
+ } else {
+ workMemTable.clear();
+ }
+ }
- bufferwriteFlushAction = parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
- bufferwriteCloseAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
- filenodeFlushAction = parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
- workMemTable = new PrimitiveMemTable();
-
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
- getBufferwriteRestoreFilePath(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
- } catch (IOException e) {
- throw new BufferWriteProcessorException(e);
- }
+ public void checkOpen() throws BufferWriteProcessorException {
+ if (isClosed) {
+ throw new BufferWriteProcessorException("BufferWriteProcessor already closed");
}
- this.versionController = versionController;
}
/**
@@ -153,6 +174,7 @@ public class BufferWriteProcessor extends Processor {
public boolean write(String deviceId, String measurementId, long timestamp, TSDataType dataType,
String value)
throws BufferWriteProcessorException {
+ checkOpen();
TSRecord record = new TSRecord(timestamp, deviceId);
DataPoint dataPoint = DataPoint.getDataPoint(dataType, measurementId, value);
record.addTuple(dataPoint);
@@ -168,6 +190,7 @@ public class BufferWriteProcessor extends Processor {
* @throws BufferWriteProcessorException if a flushing operation occurs and failed.
*/
public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException {
+ checkOpen();
long memUsage = MemUtils.getRecordSize(tsRecord);
BasicMemController.UsageLevel level = BasicMemController.getInstance()
.acquireUsage(this, memUsage);
@@ -233,7 +256,9 @@ public class BufferWriteProcessor extends Processor {
* @return corresponding chunk data and chunk metadata in memory
*/
public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> queryBufferWriteData(String deviceId,
- String measurementId, TSDataType dataType, Map<String, String> props) {
+ String measurementId, TSDataType dataType, Map<String, String> props)
+ throws BufferWriteProcessorException {
+ checkOpen();
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -255,10 +280,10 @@ public class BufferWriteProcessor extends Processor {
private void switchWorkToFlush() {
flushQueryLock.lock();
try {
- if (flushMemTable == null) {
- flushMemTable = workMemTable;
- workMemTable = new PrimitiveMemTable();
- }
+ IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : flushMemTable;
+ flushMemTable = workMemTable;
+ workMemTable = temp;
+ isFlush = true;
} finally {
flushQueryLock.unlock();
}
@@ -268,8 +293,8 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
try {
flushMemTable.clear();
- flushMemTable = null;
writer.appendMetadata();
+ isClosed = false;
} finally {
flushQueryLock.unlock();
}
@@ -329,6 +354,9 @@ public class BufferWriteProcessor extends Processor {
// keyword synchronized is added in this method, so that only one flush task can be submitted now.
@Override
public synchronized Future<Boolean> flush() throws IOException {
+ if (isClosed) {
+ throw new IOException("BufferWriteProcessor closed");
+ }
// statistic information for flush
if (lastFlushTime > 0) {
if (LOGGER.isInfoEnabled()) {
@@ -381,12 +409,18 @@ public class BufferWriteProcessor extends Processor {
@Override
public void close() throws BufferWriteProcessorException {
+ if (isClosed) {
+ return;
+ }
try {
long closeStartTime = System.currentTimeMillis();
// flush data and wait for finishing flush
flush().get();
// end file
writer.endFile(fileSchema);
+ writer = null;
+ workMemTable.clear();
+
// update the IntervalFile for interval list
bufferwriteCloseAction.act();
// flush the changed information for filenode
@@ -402,6 +436,7 @@ public class BufferWriteProcessor extends Processor {
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
closeEndTime - closeStartTime);
}
+ isClosed = true;
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
getProcessorName(), e);
@@ -424,13 +459,7 @@ public class BufferWriteProcessor extends Processor {
* @return True if flushing
*/
public boolean isFlush() {
- // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
- // So, the following case exists: flushMemtable != null but flushFuture is done (because the
- // flushFuture refers to the last finished flush.
- // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
- // (flushTask() is not finished, but switchToWork() has done)
- // So, checking flushMemTable is more meaningful than flushFuture.isDone().
- return flushMemTable != null;
+ return isFlush;
}
/**
@@ -454,38 +483,6 @@ public class BufferWriteProcessor extends Processor {
return file.length() + memoryUsage();
}
- /**
- * Close current TsFile and open a new one for future writes. Block new writes and wait until
- * current writes finish.
- */
- public void rollToNewFile() {
- // TODO : [MemControl] implement this
- }
-
- /**
- * Check if this TsFile has too big metadata or file. If true, close current file and open a new
- * one.
- */
- private boolean checkSize() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- long metaSize = getMetaSize();
- long fileSize = getFileSize();
- if (metaSize >= config.getBufferwriteMetaSizeThreshold()
- || fileSize >= config.getBufferwriteFileSizeThreshold()) {
- LOGGER.info(
- "The bufferwrite processor {}, size({}) of the file {} reaches threshold {}, "
- + "size({}) of metadata reaches threshold {}.",
- getProcessorName(), MemUtils.bytesCntToStr(fileSize), this.fileName,
- MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()),
- MemUtils.bytesCntToStr(metaSize),
- MemUtils.bytesCntToStr(config.getBufferwriteFileSizeThreshold()));
-
- rollToNewFile();
- return true;
- }
- return false;
- }
-
public String getBaseDir() {
return baseDir;
}
@@ -538,7 +535,9 @@ public class BufferWriteProcessor extends Processor {
* @param measurementId the measurementId of the timeseries to be deleted.
* @param timestamp the upper-bound of deletion time.
*/
- public void delete(String deviceId, String measurementId, long timestamp) {
+ public void delete(String deviceId, String measurementId, long timestamp)
+ throws BufferWriteProcessorException {
+ checkOpen();
workMemTable.delete(deviceId, measurementId, timestamp);
if (isFlush()) {
// flushing MemTable cannot be directly modified since another thread is reading it
@@ -572,4 +571,8 @@ public class BufferWriteProcessor extends Processor {
public String toString() {
return "BufferWriteProcessor in " + insertFilePath;
}
+
+ public boolean isClosed() {
+ return isClosed;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 47aca8e..488f456 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -534,9 +534,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
+ System.currentTimeMillis(),
params, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
- LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
- processorName, e);
- throw new FileNodeProcessorException(e);
+ throw new FileNodeProcessorException(String
+ .format("The filenode processor %s failed to get the bufferwrite processor.",
+ processorName),e);
+ }
+ } else if (bufferWriteProcessor.isClosed()){
+ try {
+ bufferWriteProcessor.reopen(insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ + System.currentTimeMillis());
+ } catch (BufferWriteProcessorException e) {
+ throw new FileNodeProcessorException("Cannot reopen BufferWriteProcessor", e);
}
}
return bufferWriteProcessor;
@@ -565,6 +572,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
versionController);
+ } else if (overflowProcessor.isClosed()){
+ overflowProcessor.reopen();
}
return overflowProcessor;
}
@@ -573,8 +582,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* get overflow processor.
*/
public OverflowProcessor getOverflowProcessor() {
- if (overflowProcessor == null) {
- LOGGER.error("The overflow processor is null when getting the overflowProcessor");
+ if (overflowProcessor == null || overflowProcessor.isClosed()) {
+ LOGGER.error("The overflow processor is null or closed when getting the overflowProcessor");
}
return overflowProcessor;
}
@@ -583,15 +592,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
return overflowProcessor != null;
}
- public void setBufferwriteProcessroToClosed() {
-
- bufferWriteProcessor = null;
- }
-
- public boolean hasBufferwriteProcessor() {
-
- return bufferWriteProcessor != null;
- }
/**
* set last update time.
@@ -797,18 +797,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
&& !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
unsealedTsFile = new UnsealedTsFile();
unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 1).getFilePath());
- if (bufferWriteProcessor == null) {
- LOGGER.error(
- "The last of tsfile {} in filenode processor {} is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
- throw new FileNodeProcessorException(String.format(
- "The last of tsfile %s in filenode processor %s is not closed, "
- + "but the bufferwrite processor is null.",
- newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
+
+ try {
+ bufferwritedata = bufferWriteProcessor
+ .queryBufferWriteData(deviceId, measurementId, dataType, mSchema.getProps());
+ } catch (BufferWriteProcessorException e) {
+ throw new FileNodeProcessorException(e);
}
- bufferwritedata = bufferWriteProcessor
- .queryBufferWriteData(deviceId, measurementId, dataType, mSchema.getProps());
try {
List<Modification> pathModifications = context.getPathModifications(
@@ -954,7 +949,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
lastMergeTime = System.currentTimeMillis();
- if (overflowProcessor != null) {
+ if (overflowProcessor != null && !overflowProcessor.isClosed()) {
if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
.getConfig().getOverflowFileSizeThreshold()) {
if (LOGGER.isInfoEnabled()) {
@@ -1714,10 +1709,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public FileNodeFlushFuture flush() throws IOException {
Future<Boolean> bufferWriteFlushFuture = null;
Future<Boolean> overflowFlushFuture = null;
- if (bufferWriteProcessor != null) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
bufferWriteFlushFuture = bufferWriteProcessor.flush();
}
- if (overflowProcessor != null) {
+ if (overflowProcessor != null && !overflowProcessor.isClosed()) {
overflowFlushFuture = overflowProcessor.flush();
}
return new FileNodeFlushFuture(bufferWriteFlushFuture, overflowFlushFuture);
@@ -1727,7 +1722,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Close the bufferwrite processor.
*/
public void closeBufferWrite() throws FileNodeProcessorException {
- if (bufferWriteProcessor == null) {
+ if (bufferWriteProcessor == null || bufferWriteProcessor.isClosed()) {
return;
}
try {
@@ -1735,7 +1730,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
waitForBufferWriteClose();
}
bufferWriteProcessor.close();
- bufferWriteProcessor = null;
} catch (BufferWriteProcessorException e) {
throw new FileNodeProcessorException(e);
}
@@ -1756,7 +1750,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* Close the overflow processor.
*/
public void closeOverflow() throws FileNodeProcessorException {
- if (overflowProcessor == null) {
+ if (overflowProcessor == null || overflowProcessor.isClosed()) {
return;
}
try {
@@ -1764,9 +1758,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
waitForOverflowClose();
}
overflowProcessor.close();
- overflowProcessor.clear();
- overflowProcessor = null;
- } catch (OverflowProcessorException | IOException e) {
+ } catch (OverflowProcessorException e) {
throw new FileNodeProcessorException(e);
}
}
@@ -1897,7 +1889,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// delete data in memory
OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
ofProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
- if (bufferWriteProcessor != null) {
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
}
} catch (Exception e) {
@@ -1945,8 +1937,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
throw e;
}
- if (bufferWriteProcessor != null) {
- bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ if (bufferWriteProcessor != null && !bufferWriteProcessor.isClosed()) {
+ try {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ } catch (BufferWriteProcessorException e) {
+ throw new IOException(e);
+ }
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 2e7302f..1d98d66 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -96,6 +96,9 @@ public class OverflowProcessor extends Processor {
private WriteLogNode logNode;
private VersionController versionController;
+ private boolean isClosed = false;
+ private boolean isFlush = false;
+
public OverflowProcessor(String processorName, Map<String, Action> parameters,
FileSchema fileSchema, VersionController versionController)
throws IOException {
@@ -108,14 +111,7 @@ public class OverflowProcessor extends Processor {
overflowDirPath = overflowDirPath + File.separatorChar;
}
this.parentPath = overflowDirPath + processorName;
- File processorDataDir = new File(parentPath);
- if (!processorDataDir.exists()) {
- processorDataDir.mkdirs();
- }
- // recover file
- recovery(processorDataDir);
- // memory
- workSupport = new OverflowMemtable();
+
overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
filenodeFlushAction = parameters
.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
@@ -126,8 +122,36 @@ public class OverflowProcessor extends Processor {
getOverflowRestoreFile(),
FileNodeManager.getInstance().getRestoreFilePath(processorName));
}
+
+ reopen();
}
+ public void reopen() throws IOException {
+ if (!isClosed) {
+ return;
+ }
+ // recover file
+ File processorDataDir = new File(parentPath);
+ if (!processorDataDir.exists()) {
+ processorDataDir.mkdirs();
+ }
+ recovery(processorDataDir);
+
+ // memory
+ if (workSupport == null) {
+ workSupport = new OverflowMemtable();
+ } else {
+ workSupport.clear();
+ }
+
+ }
+ public void checkOpen() throws OverflowProcessorException {
+ if (isClosed) {
+ throw new OverflowProcessorException("OverflowProcessor already closed");
+ }
+ }
+
+
private void recovery(File parentFile) throws IOException {
String[] subFilePaths = clearFile(parentFile.list());
if (subFilePaths.length == 0) {
@@ -173,6 +197,11 @@ public class OverflowProcessor extends Processor {
* insert one time-series record
*/
public void insert(TSRecord tsRecord) throws IOException {
+ try {
+ checkOpen();
+ } catch (OverflowProcessorException e) {
+ throw new IOException(e);
+ }
// memory control
long memUage = MemUtils.getRecordSize(tsRecord);
UsageLevel usageLevel = BasicMemController.getInstance().acquireUsage(this, memUage);
@@ -261,6 +290,11 @@ public class OverflowProcessor extends Processor {
*/
public void delete(String deviceId, String measurementId, long timestamp, long version,
List<ModificationFile> updatedModFiles) throws IOException {
+ try {
+ checkOpen();
+ } catch (OverflowProcessorException e) {
+ throw new IOException(e);
+ }
workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
workSupport.delete(deviceId, measurementId, timestamp, false);
if (isFlush()) {
@@ -400,8 +434,10 @@ public class OverflowProcessor extends Processor {
private void switchWorkToFlush() {
queryFlushLock.lock();
try {
+ OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
flushSupport = workSupport;
- workSupport = new OverflowMemtable();
+ workSupport = temp;
+ isFlush = true;
} finally {
queryFlushLock.unlock();
}
@@ -412,7 +448,7 @@ public class OverflowProcessor extends Processor {
try {
flushSupport.clear();
workResource.appendMetadatas();
- flushSupport = null;
+ isFlush = false;
} finally {
queryFlushLock.unlock();
}
@@ -444,8 +480,7 @@ public class OverflowProcessor extends Processor {
}
public boolean isFlush() {
- //see BufferWriteProcess.isFlush()
- return flushSupport != null;
+ return isFlush;
}
private boolean flushTask(String displayMessage) {
@@ -546,6 +581,9 @@ public class OverflowProcessor extends Processor {
@Override
public void close() throws OverflowProcessorException {
+ if (isClosed) {
+ return;
+ }
LOGGER.info("The overflow processor {} starts close operation.", getProcessorName());
long closeStartTime = System.currentTimeMillis();
// flush data
@@ -570,14 +608,22 @@ public class OverflowProcessor extends Processor {
DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
closeEndTime - closeStartTime);
}
+ try {
+ clear();
+ } catch (IOException e) {
+ throw new OverflowProcessorException(e);
+ }
+ isClosed = true;
}
public void clear() throws IOException {
if (workResource != null) {
workResource.close();
+ workResource = null;
}
if (mergeResource != null) {
mergeResource.close();
+ mergeResource = null;
}
}
@@ -705,4 +751,8 @@ public class OverflowProcessor extends Processor {
public String toString() {
return "OverflowProcessor in " + parentPath;
}
+
+ public boolean isClosed() {
+ return isClosed;
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
index 213d3c0..d3cf362 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/FileNodeProcessorException.java
@@ -37,4 +37,8 @@ public class FileNodeProcessorException extends ProcessorException {
public FileNodeProcessorException(Throwable throwable) {
super(throwable.getMessage());
}
+
+ public FileNodeProcessorException(String msg, Throwable e) {
+ super(msg, e);
+ }
}