You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 09:03:58 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated:
resolve duplicated data in flushing memtable and writer,
refactor flushing memtable data deletion
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 1d8a267 resolve duplicated data in flushing memtable and writer, refactor flushing memtable data deletion
new 774a5f0 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
1d8a267 is described below
commit 1d8a2679f479968972b44b4539617c0a6742a595
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 17:00:07 2019 +0800
resolve duplicated data in flushing memtable and writer, refactor flushing memtable data deletion
---
.../engine/bufferwrite/BufferWriteProcessor.java | 48 ++++---------
.../db/engine/filenode/FileNodeProcessor.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 37 ++++++++--
.../apache/iotdb/db/engine/memtable/IMemTable.java | 12 +++-
.../db/engine/memtable/IWritableMemChunk.java | 2 +
.../db/engine/memtable/MemTableFlushTask.java | 2 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 10 ++-
.../db/engine/overflow/io/OverflowMemtable.java | 6 +-
.../db/engine/overflow/io/OverflowProcessor.java | 4 +-
.../db/writelog/node/ExclusiveWriteLogNode.java | 16 +++--
.../iotdb/db/writelog/node/WriteLogNode.java | 6 +-
.../recover/ExclusiveLogRecoverPerformer.java | 2 +-
.../writelog/recover/FileNodeRecoverPerformer.java | 55 --------------
.../recover/StorageGroupRecoverPerformer.java | 84 ++++++++++++++++++++++
.../org/apache/iotdb/db/writelog/RecoverTest.java | 2 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 4 +-
16 files changed, 177 insertions(+), 115 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index b331a57..4148359 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.bufferwrite;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
@@ -45,11 +47,13 @@ import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.MemTablePool;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.FileUtils;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -76,7 +80,6 @@ public class BufferWriteProcessor extends Processor {
private AtomicLong memSize = new AtomicLong();
private long memThreshold = TSFileConfig.groupSizeInByte;
private IMemTable workMemTable;
- private IMemTable flushMemTable;
// each flush task has a flushId, IO task should scheduled by this id
private long flushId = -1;
@@ -277,7 +280,7 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushMemTable != null) {
+ if (!flushingMemTables.isEmpty()) {
memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType, props));
}
memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props));
@@ -292,27 +295,12 @@ public class BufferWriteProcessor extends Processor {
}
}
- private void switchWorkToFlush() {
- LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchWorkToFlush", getProcessorName());
- flushQueryLock.lock();
- LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchWorkToFlush successfully", getProcessorName());
- try {
- IMemTable temp = flushMemTable == null ? MemTablePool.getInstance().getEmptyMemTable() : flushMemTable;
- flushMemTable = workMemTable;
- workMemTable = temp;
- isFlush = true;
- } finally {
- flushQueryLock.unlock();
- LOGGER.info("BufferWrite Processor {} release the flushQueryLock for switchWorkToFlush successfully", getProcessorName());
- }
- }
private void switchFlushToWork() {
LOGGER.info("BufferWrite Processor {} try to get flushQueryLock for switchFlushToWork", getProcessorName());
flushQueryLock.lock();
LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchFlushToWork", getProcessorName());
try {
- flushMemTable.clear();
writer.appendMetadata();
isFlush = false;
} finally {
@@ -361,7 +349,7 @@ public class BufferWriteProcessor extends Processor {
filenodeFlushAction.act();
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyEndFlush(null, walTaskId);
+ logNode.notifyEndFlush(null, walTaskId, new File(writer.getInsertFilePath()).getName());
}
result = true;
} catch (Exception e) {
@@ -413,17 +401,6 @@ public class BufferWriteProcessor extends Processor {
}
lastFlushTime = System.currentTimeMillis();
// check value count
- // waiting for the end of last flush operation.
- try {
- long startTime = System.currentTimeMillis();
- flushFuture.get();
- long timeCost = System.currentTimeMillis() - startTime;
- if (timeCost > 10) {
- LOGGER.info("BufferWrite Processor {} wait for the previous flushing task for {} ms.", getProcessorName(), timeCost);
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
if (valueCount > 0) {
// update the lastUpdatetime, prepare for flush
try {
@@ -434,16 +411,14 @@ public class BufferWriteProcessor extends Processor {
}
final long walTaskId;
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- walTaskId = logNode.notifyStartFlush();
+ walTaskId = logNode.notifyStartFlush(new File(writer.getInsertFilePath()).getName());
LOGGER.info("BufferWrite Processor {} has notified WAL for flushing.", getProcessorName());
} else {
walTaskId = 0;
}
valueCount = 0;
- synchronized (flushingMemTables) {
- flushingMemTables.add(workMemTable);
- }
+ flushingMemTables.add(workMemTable);
IMemTable tmpMemTableToFlush = workMemTable;
workMemTable = MemTablePool.getInstance().getEmptyMemTable();
@@ -638,8 +613,11 @@ public class BufferWriteProcessor extends Processor {
workMemTable.delete(deviceId, measurementId, timestamp);
if (isFlush()) {
// flushing MemTable cannot be directly modified since another thread is reading it
- flushMemTable = flushMemTable.copy();
- flushMemTable.delete(deviceId, measurementId, timestamp);
+ for (IMemTable memTable : flushingMemTables) {
+ if (memTable.containSeries(deviceId, measurementId)) {
+ memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
+ }
+ }
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 78b41ae..ef36dfa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -115,7 +115,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any tsfile which"
+ " will be overflowed in the filenode processor {}, ";
- private static final String RESTORE_FILE_SUFFIX = ".restore";
+ public static final String RESTORE_FILE_SUFFIX = ".restore";
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessor.class);
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
private static final MManager mManager = MManager.getInstance();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 7e57a6b..1a4f755 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -18,9 +18,14 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -28,6 +33,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
public abstract class AbstractMemTable implements IMemTable {
+ private List<Modification> modifications = new ArrayList<>();
+
private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
public AbstractMemTable() {
@@ -104,14 +111,33 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
Map<String, String> props) {
- return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId, measurement, dataType), props);
+
+ return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId,
+ measurement, dataType), props);
+ }
+
+ private long findUndeletedTime(String deviceId, String measurement) {
+ String path = deviceId + PATH_SEPARATOR + measurement;
+ long undeletedTime = 0;
+ for (Modification modification : modifications) {
+ if (modification instanceof Deletion) {
+ Deletion deletion = (Deletion) modification;
+ if (deletion.getPath().equals(path) && deletion.getTimestamp() > undeletedTime) {
+ undeletedTime = deletion.getTimestamp();
+ }
+ }
+ }
+ return undeletedTime + 1;
}
private TimeValuePairSorter getSeriesData(String deviceId, String measurement, TSDataType dataType) {
if (!checkPath(deviceId, measurement)) {
return new WritableMemChunk(dataType);
}
- return memTableMap.get(deviceId).get(measurement);
+ long undeletedTime = findUndeletedTime(deviceId, measurement);
+ IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+ memChunk.setTimeOffset(undeletedTime);
+ return memChunk;
}
@Override
@@ -119,8 +145,6 @@ public abstract class AbstractMemTable implements IMemTable {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
if (deviceMap != null) {
IWritableMemChunk chunk = deviceMap.get(measurementId);
- //TODO: if the memtable is thread safe, then we do not need to copy data again,
- // otherwise current implementation is error.
IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
if (newChunk != null) {
deviceMap.put(measurementId, newChunk);
@@ -130,6 +154,11 @@ public abstract class AbstractMemTable implements IMemTable {
return false;
}
+ @Override
+ public boolean delete(Deletion deletion) {
+ return this.modifications.add(deletion);
+ }
+
/**
* If chunk contains data with timestamp less than 'timestamp', create a copy and delete all those
* data. Otherwise return null.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index bca8d05..bc28c56 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.memtable;
import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -64,7 +65,7 @@ public interface IMemTable {
/**
* Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
- * deviceId.measurementId.
+ * deviceId.measurementId. Only called for non-flushing MemTable.
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
@@ -74,6 +75,15 @@ public interface IMemTable {
boolean delete(String deviceId, String measurementId, long timestamp);
/**
+ * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
+ * deviceId.measurementId. Only called for flushing MemTable.
+ *
+ * @param deletion and object representing this deletion
+ * @return true if there is data that been deleted. otherwise false.
+ */
+ boolean delete(Deletion deletion);
+
+ /**
* Make a copy of this MemTable.
*
* @return a MemTable with the same data as this one.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index fbb1fa5..267f26f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -44,4 +44,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
int count();
TSDataType getType();
+
+ void setTimeOffset(long offset);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 27926c0..1d7b56b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -138,8 +138,8 @@ public class MemTableFlushTask {
ioTime += System.currentTimeMillis() - starTime;
}
}
- flushCallBack.afterFlush(memTable, tsFileIoWriter);
MemTablePool.getInstance().release(memTable);
+ flushCallBack.afterFlush(memTable, tsFileIoWriter);
tsFileIoWriter.getFlushID().getAndIncrement();
LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk: io cost {} ms.",
processName, ioTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3c36f98..62d3f11 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -34,6 +34,7 @@ public class WritableMemChunk implements IWritableMemChunk {
private TSDataType dataType;
private PrimitiveArrayList list;
+ private long timeOffset = 0;
public WritableMemChunk(TSDataType dataType) {
this.dataType = dataType;
@@ -128,7 +129,9 @@ public class WritableMemChunk implements IWritableMemChunk {
int length = list.size();
Map<Long, TsPrimitiveType> map = new HashMap<>(length, 1.0f);
for (int i = 0; i < length; i++) {
- map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i)));
+ if (list.getTimestamp(i) >= timeOffset) {
+ map.put(list.getTimestamp(i), TsPrimitiveType.getByType(dataType, list.getValue(i)));
+ }
}
List<TimeValuePair> ret = new ArrayList<>(map.size());
map.forEach((k, v) -> ret.add(new TimeValuePairInMemTable(k, v)));
@@ -151,4 +154,9 @@ public class WritableMemChunk implements IWritableMemChunk {
return dataType;
}
+ @Override
+ public void setTimeOffset(long offset) {
+ timeOffset = offset;
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
index 5fadb54..5c3d3f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.db.engine.overflow.io;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
import java.util.HashMap;
import java.util.Map;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
@@ -76,8 +79,7 @@ public class OverflowMemtable {
public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
if (isFlushing) {
- memTable = memTable.copy();
- memTable.delete(deviceId, measurementId, timestamp);
+ memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
} else {
memTable.delete(deviceId, measurementId, timestamp);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 8b19e5c..b6bab39 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -496,7 +496,7 @@ public class OverflowProcessor extends Processor {
filenodeFlushAction.act();
// write-ahead log
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- logNode.notifyEndFlush(null, walTaskId);
+ logNode.notifyEndFlush(null, walTaskId, workResource.getInsertFile().getName());
}
result = true;
} catch (IOException e) {
@@ -559,7 +559,7 @@ public class OverflowProcessor extends Processor {
long taskId = 0;
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
- taskId = logNode.notifyStartFlush();
+ taskId = logNode.notifyStartFlush(workResource.getInsertFile().getName());
} catch (IOException e) {
LOGGER.error("Overflow processor {} encountered an error when notifying log node, {}",
getProcessorName(), e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 3c7fb4d..8ebddb4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -161,15 +161,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
* Warning : caller must have lock.
*/
@Override
- public long notifyStartFlush() {
+ public long notifyStartFlush(String tsFileName) {
close();
- File oldLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME);
+ File oldLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME);
if (!oldLogFile.exists()) {
return 0;
}
long id = taskId.incrementAndGet();
- File newLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME + OLD_SUFFIX + id);
+ File newLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME +
+ OLD_SUFFIX + id);
if (!oldLogFile.renameTo(newLogFile)) {
logger.error("Log node {} renaming log file failed!", identifier);
} else {
@@ -183,8 +184,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
* Warning : caller must have lock.
*/
@Override
- public void notifyEndFlush(List<LogPosition> logPositions, long taskId) {
- discard(taskId);
+ public void notifyEndFlush(List<LogPosition> logPositions, long taskId, String tsFileName) {
+ discard(taskId, tsFileName);
}
@Override
@@ -270,8 +271,9 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
}
- private void discard(long id) {
- File oldLogFile = new File(logDirectory + File.separator + WAL_FILE_NAME + OLD_SUFFIX + id);
+ private void discard(long id, String tsFileName) {
+ File oldLogFile = new File(logDirectory + File.separator + tsFileName + WAL_FILE_NAME
+ + OLD_SUFFIX + id);
if (!oldLogFile.exists()) {
logger.info("No old log to be deleted");
} else {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index b2e7fc3..dc40731 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -58,17 +58,19 @@ public interface WriteLogNode {
/**
* When a FileNode attempts to start a flush, this method must be called to rename log file.
+ * @param tsFileName the name of the tsfile to which the data in this wal belongs
* @return the task id ( being used in the renamed log file)
*/
- long notifyStartFlush() throws IOException;
+ long notifyStartFlush(String tsFileName) throws IOException;
/**
* When the flush of a FlieNode ends, this method must be called to check if log file needs
* cleaning.
* @param logPositions (deprecated)
+ * @param tsFileName the name of the tsfile to which the data in this wal belongs
* @param taskId the task id that notifyStartFlush() returns.
*/
- void notifyEndFlush(List<LogPosition> logPositions, long taskId);
+ void notifyEndFlush(List<LogPosition> logPositions, long taskId, String tsFileName);
/**
* return identifier of the log node.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 9d4b958..65c29a8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -70,7 +70,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
this.restoreFilePath = restoreFilePath;
this.processorStoreFilePath = processorStoreFilePath;
this.writeLogNode = logNode;
- this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
+ this.fileNodeRecoverPerformer = new StorageGroupRecoverPerformer(writeLogNode.getIdentifier());
this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
deleted file mode 100644
index de339f6..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/FileNodeRecoverPerformer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.writelog.recover;
-
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.RecoverException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileNodeRecoverPerformer implements RecoverPerformer {
-
- private static final Logger logger = LoggerFactory.getLogger(FileNodeRecoverPerformer.class);
-
- /**
- * If the storage group is "root.a.b", then the identifier of a bufferwrite processor will be
- * "root.a.b-bufferwrite", and the identifier of an overflow processor will be
- * "root.a.b-overflow".
- */
- private String identifier;
-
- public FileNodeRecoverPerformer(String identifier) {
- this.identifier = identifier;
- }
-
- @Override
- public void recover() throws RecoverException {
- try {
- FileNodeManager.getInstance().recoverFileNode(getFileNodeName());
- } catch (FileNodeManagerException e) {
- logger.error("Cannot recover filenode {}", identifier);
- throw new RecoverException(e);
- }
- }
-
- public String getFileNodeName() {
- return identifier.split("-")[0];
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java
new file mode 100644
index 0000000..34ea50c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/StorageGroupRecoverPerformer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.writelog.recover;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
+import org.apache.iotdb.db.exception.RecoverException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageGroupRecoverPerformer implements RecoverPerformer {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageGroupRecoverPerformer.class);
+
+ /**
+ * If the storage group is "root.a.b", then the identifier of a bufferwrite processor will be
+ * "root.a.b-bufferwrite", and the identifier of an overflow processor will be
+ * "root.a.b-overflow".
+ */
+ private String identifier;
+
+ private String storageGroupName;
+
+ public StorageGroupRecoverPerformer(String identifier) {
+ this.identifier = identifier;
+ this.storageGroupName = identifier.split("-")[0];
+ }
+
+ @Override
+ public void recover() {
+ recoverBufferWrite();
+ }
+
+ private void recoverBufferWrite() {
+ List<String> bufferWritePathList = Directories.getInstance().getAllTsFileFolders();
+ File logDirectory = new File(IoTDBDescriptor.getInstance().getConfig().getWalFolder()
+ + File.separator + this.identifier);
+ for (String bufferWritePath : bufferWritePathList) {
+ File bufferDir = new File(bufferWritePath, storageGroupName);
+ // free and close the streams under this bufferwrite directory
+ if (!bufferDir.exists()) {
+ continue;
+ }
+ // only TsFiles with a restore file should be recovered
+ File[] bufferFiles = bufferDir.listFiles((fileName) ->
+ fileName.getName().contains(FileNodeProcessor.RESTORE_FILE_SUFFIX));
+ if (bufferFiles != null) {
+ for (File bufferFile : bufferFiles) {
+ File[] walFiles = null;
+ if (logDirectory.exists()) {
+ walFiles = logDirectory.listFiles((filename) -> {
+ return filename.getName().contains(bufferFile.getName());
+ });
+ }
+ }
+ }
+ }
+ }
+
+ public String getStorageGroupName() {
+ return identifier.split("-")[0];
+ }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index 9a9617c..45bd737 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -96,7 +96,7 @@ public class RecoverTest {
logNode.write(bwInsertPlan);
logNode.write(updatePlan);
- logNode.notifyStartFlush();
+ logNode.notifyStartFlush("test");
logNode.write(deletePlan);
logNode.forceSync();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 321387f..9a4fc61 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -164,13 +164,13 @@ public class WriteLogNodeTest {
config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal");
assertTrue(walFile.exists());
- long taskId = logNode.notifyStartFlush();
+ long taskId = logNode.notifyStartFlush("test");
File oldWalFile = new File(
config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal-old"+taskId);
assertTrue(oldWalFile.exists());
assertTrue(oldWalFile.length() > 0);
- logNode.notifyEndFlush(null, taskId);
+ logNode.notifyEndFlush(null, taskId, "test");
assertTrue(!oldWalFile.exists());
assertEquals(0, walFile.length());