You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 08:33:18 UTC
[incubator-iotdb] 01/03: remove FileNodeRestore file
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2d19bdbec97e2f02f324806b2a54574b9a603315
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 13:09:47 2019 +0800
remove FileNodeRestore file
---
.../iotdb/db/engine/UnsealedTsFileProcessorV2.java | 10 +-
.../db/engine/filenode/FileNodeProcessor.java | 2 +-
.../filenodeV2/FileNodeProcessorStoreV2.java | 173 ---------------------
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 119 ++++----------
.../apache/iotdb/db/engine/memtable/Callback.java | 25 ---
.../db/engine/memtable/MemTableFlushTaskV2.java | 6 +-
6 files changed, 37 insertions(+), 298 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index a089d0d..7111873 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -26,10 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.bufferwriteV2.FlushManager;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.memtable.Callback;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
@@ -73,7 +74,7 @@ public class UnsealedTsFileProcessorV2 {
protected VersionController versionController;
- private Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
+ private Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
/**
* sync this object in query() and asyncFlush()
@@ -81,7 +82,8 @@ public class UnsealedTsFileProcessorV2 {
private final LinkedList<IMemTable> flushingMemTables = new LinkedList<>();
public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
- VersionController versionController, Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
+ VersionController versionController,
+ Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
throws IOException {
this.storageGroupName = storageGroupName;
this.fileSchema = fileSchema;
@@ -182,7 +184,7 @@ public class UnsealedTsFileProcessorV2 {
writer = null;
// remove this processor from Closing list in FileNodeProcessor
- closeUnsealedTsFileProcessor.call(this);
+ closeUnsealedTsFileProcessor.accept(this);
// delete the restore for this bufferwrite processor
if (LOGGER.isInfoEnabled()) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 082063a..7bc28c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -202,7 +202,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// @Override
// public void act() {
// synchronized (fileNodeProcessorStore) {
-// fileNodeProcessorStore.setLatestTimeMap(lastUpdateTimeMap);
+// fileNodeProcessorStore.setLatestFlushTimeForEachDevice(lastUpdateTimeMap);
// addLastTimeToIntervalFile();
// fileNodeProcessorStore.setSequenceFileList(newFileNodes);
// }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
deleted file mode 100644
index d16006d..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite setCloseMark.
- * emptyTsFileResource and sequenceFileList are changed and stored by Overflow flushMetadata and
- * Overflow setCloseMark. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
- * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed and stored when
- * FileNodeProcessor's status changes from work to merge.
- */
-public class FileNodeProcessorStoreV2 implements Serializable {
-
- private static final long serialVersionUID = -54525372941897565L;
-
- private boolean isOverflowed;
- private Map<String, Long> latestTimeMap;
- private List<TsFileResourceV2> sequenceFileList;
- private List<TsFileResourceV2> unSequenceFileList;
- private int numOfMergeFile;
- private FileNodeProcessorStatus fileNodeProcessorStatus;
-
- /**
- * Constructor of FileNodeProcessorStore.
- *
- * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
- * @param latestTimeMap the timestamp of last data point of each device in this FileNode.
- * @param sequenceFileList sequnce tsfiles in the FileNode.
- * @param unSequenceFileList unsequnce tsfiles in the FileNode
- * @param fileNodeProcessorStatus the status of the FileNode.
- * @param numOfMergeFile the number of files already merged in one merge operation.
- */
- public FileNodeProcessorStoreV2(boolean isOverflowed, Map<String, Long> latestTimeMap,
- List<TsFileResourceV2> sequenceFileList, List<TsFileResourceV2> unSequenceFileList,
- FileNodeProcessorStatus fileNodeProcessorStatus,
- int numOfMergeFile) {
- this.isOverflowed = isOverflowed;
- this.latestTimeMap = latestTimeMap;
- this.sequenceFileList = sequenceFileList;
- this.unSequenceFileList = unSequenceFileList;
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- this.numOfMergeFile = numOfMergeFile;
- }
-
- public void serialize(OutputStream outputStream) throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
- // latestTimeMap
- ReadWriteIOUtils.write(latestTimeMap.size(), byteArrayOutputStream);
- for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
- ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
- }
- ReadWriteIOUtils.write(this.sequenceFileList.size(), byteArrayOutputStream);
- for (TsFileResourceV2 tsFileResource : this.sequenceFileList) {
- tsFileResource.serialize(byteArrayOutputStream);
- }
- ReadWriteIOUtils.write(this.unSequenceFileList.size(), byteArrayOutputStream);
- for (TsFileResourceV2 tsFileResource : this.unSequenceFileList) {
- tsFileResource.serialize(byteArrayOutputStream);
- }
- ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
- ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(), byteArrayOutputStream);
- // buffer array to outputstream
- byteArrayOutputStream.writeTo(outputStream);
- }
-
- public static FileNodeProcessorStoreV2 deSerialize(InputStream inputStream) throws IOException {
- boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
- Map<String, Long> lastUpdateTimeMap = new HashMap<>();
- int size = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- long time = ReadWriteIOUtils.readLong(inputStream);
- lastUpdateTimeMap.put(path, time);
- }
- size = ReadWriteIOUtils.readInt(inputStream);
- List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- sequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
- }
- size = ReadWriteIOUtils.readInt(inputStream);
- List<TsFileResourceV2> unsequenceFileList = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- unsequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
- }
- int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
- FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
- .deserialize(ReadWriteIOUtils.readShort(inputStream));
-
- return new FileNodeProcessorStoreV2(isOverflowed, lastUpdateTimeMap,
- sequenceFileList, unsequenceFileList, fileNodeProcessorStatus, numOfMergeFile);
- }
-
- public boolean isOverflowed() {
- return isOverflowed;
- }
-
- public void setOverflowed(boolean isOverflowed) {
- this.isOverflowed = isOverflowed;
- }
-
- public FileNodeProcessorStatus getFileNodeProcessorStatus() {
- return fileNodeProcessorStatus;
- }
-
- public void setFileNodeProcessorStatus(FileNodeProcessorStatus fileNodeProcessorStatus) {
- this.fileNodeProcessorStatus = fileNodeProcessorStatus;
- }
-
- public Map<String, Long> getLatestTimeMap() {
- return new HashMap<>(latestTimeMap);
- }
-
- public void setLatestTimeMap(Map<String, Long> latestTimeMap) {
- this.latestTimeMap = latestTimeMap;
- }
-
- public List<TsFileResourceV2> getSequenceFileList() {
- return sequenceFileList;
- }
-
- public void setSequenceFileList(List<TsFileResourceV2> sequenceFileList) {
- this.sequenceFileList = sequenceFileList;
- }
-
- public List<TsFileResourceV2> getUnSequenceFileList() {
- return unSequenceFileList;
- }
-
- public int getNumOfMergeFile() {
- return numOfMergeFile;
- }
-
- public void setNumOfMergeFile(int numOfMergeFile) {
- this.numOfMergeFile = numOfMergeFile;
- }
-
- public void setUnSequenceFileList(
- List<TsFileResourceV2> unSequenceFileList) {
- this.unSequenceFileList = unSequenceFileList;
- }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index fee51dc..6dedfa8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -19,12 +19,11 @@
package org.apache.iotdb.db.engine.filenodeV2;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -34,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -67,19 +65,19 @@ public class FileNodeProcessorV2 {
private FileSchema fileSchema;
// includes sealed and unsealed sequnce tsfiles
- private List<TsFileResourceV2> sequenceFileList;
+ private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
// includes sealed and unsealed unsequnce tsfiles
- private List<TsFileResourceV2> unSequenceFileList;
+ private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/**
* device -> global latest timestamp of each device
*/
- private Map<String, Long> latestTimeForEachDevice;
+ private Map<String, Long> latestTimeForEachDevice = new HashMap<>();
/**
* device -> largest timestamp of the latest memtable to be submitted to asyncFlush
@@ -92,14 +90,6 @@ public class FileNodeProcessorV2 {
private VersionController versionController;
- // TODO delete the file path
- private String absoluteFileNodeRestoreFilePath;
-
- private FileNodeProcessorStoreV2 fileNodeProcessorStore;
-
- // TODO delete this lock
- private final Object fileNodeRestoreLock = new Object();
-
public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
throws FileNodeProcessorException {
this.storageGroupName = storageGroupName;
@@ -122,20 +112,9 @@ public class FileNodeProcessorV2 {
"directory {}", storageGroupName, restoreFolder.getAbsolutePath());
}
- absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
-
- try {
- fileNodeProcessorStore = readStoreFromDiskOrCreate();
- } catch (FileNodeProcessorException e) {
- LOGGER.error("The fileNode processor {} encountered an error when recovering restore " +
- "information.", storageGroupName);
- throw new FileNodeProcessorException(e);
- }
+ String absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
- // TODO deep clone the lastupdate time, change the getSequenceFileList to V2
- sequenceFileList = fileNodeProcessorStore.getSequenceFileList();
- unSequenceFileList = fileNodeProcessorStore.getUnSequenceFileList();
- latestTimeForEachDevice = fileNodeProcessorStore.getLatestTimeMap();
+ recovery();
/**
* version controller
@@ -150,6 +129,10 @@ public class FileNodeProcessorV2 {
this.fileSchema = constructFileSchema(storageGroupName);
}
+ // TODO: Jiang Tian
+ private void recovery(){
+ }
+
private FileSchema constructFileSchema(String storageGroupName) {
List<MeasurementSchema> columnSchemaList;
columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
@@ -177,44 +160,6 @@ public class FileNodeProcessorV2 {
}
}
-
- /**
- * read file node store from disk or create a new one
- */
- private FileNodeProcessorStoreV2 readStoreFromDiskOrCreate() throws FileNodeProcessorException {
-
- synchronized (fileNodeRestoreLock) {
- File restoreFile = new File(absoluteFileNodeRestoreFilePath);
- if (!restoreFile.exists() || restoreFile.length() == 0) {
- return new FileNodeProcessorStoreV2(false, new HashMap<>(),
- new ArrayList<>(), new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
- }
- try (FileInputStream inputStream = new FileInputStream(absoluteFileNodeRestoreFilePath)) {
- return FileNodeProcessorStoreV2.deSerialize(inputStream);
- } catch (IOException e) {
- LOGGER
- .error("Failed to deserialize the FileNodeRestoreFile {}, {}",
- absoluteFileNodeRestoreFilePath,
- e);
- throw new FileNodeProcessorException(e);
- }
- }
- }
-
- private void writeStoreToDisk(FileNodeProcessorStoreV2 fileNodeProcessorStore)
- throws FileNodeProcessorException {
-
- synchronized (fileNodeRestoreLock) {
- try (FileOutputStream fileOutputStream = new FileOutputStream(absoluteFileNodeRestoreFilePath)) {
- fileNodeProcessorStore.serialize(fileOutputStream);
- LOGGER.debug("The filenode processor {} writes restore information to the restore file",
- storageGroupName);
- } catch (IOException e) {
- throw new FileNodeProcessorException(e);
- }
- }
- }
-
public boolean insert(TSRecord tsRecord) {
lock.writeLock().lock();
boolean result;
@@ -247,16 +192,18 @@ public class FileNodeProcessorV2 {
if (unsealedTsFileProcessor == null) {
if (sequence) {
String baseDir = directories.getNextFolderForTsfile();
- String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
- fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+ String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+ unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+ new File(filePath),
+ fileSchema, versionController, this::closeUnsealedTsFileProcessor);
sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
} else {
// TODO check if the disk is full
String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
- String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
- unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
- fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+ String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+ unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+ new File(filePath),
+ fileSchema, versionController, this::closeUnsealedTsFileProcessor);
unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
}
}
@@ -356,29 +303,17 @@ public class FileNodeProcessorV2 {
* put the memtable back to the MemTablePool and make the metadata in writer visible
*/
// TODO please consider concurrency with query and write method.
- private void closeUnsealedTsFileProcessorCallBack(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
+ private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
- synchronized (fileNodeProcessorStore) {
- fileNodeProcessorStore.setLatestTimeMap(latestTimeForEachDevice);
-
- if (!sequenceFileList.isEmpty()) {
- // end time with one start time
- Map<String, Long> endTimeMap = new HashMap<>();
- TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
- synchronized (resource) {
- for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
- String deviceId = startTime.getKey();
- endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
- }
- resource.setEndTimeMap(endTimeMap);
- }
- }
- fileNodeProcessorStore.setSequenceFileList(sequenceFileList);
- try {
- writeStoreToDisk(fileNodeProcessorStore);
- } catch (FileNodeProcessorException e) {
- LOGGER.error("write FileNodeStore info error, because {}", e.getMessage(), e);
+ // end time with one start time
+ Map<String, Long> endTimeMap = new HashMap<>();
+ TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
+ synchronized (resource) {
+ for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
}
+ resource.setEndTimeMap(endTimeMap);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
deleted file mode 100644
index 1706203..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-public interface Callback<T> {
-
- void call(T object);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a5e229a..621b5fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -43,11 +43,11 @@ public class MemTableFlushTaskV2 {
private boolean stop = false;
private String processorName;
- private Callback<IMemTable> flushCallBack;
+ private Consumer<IMemTable> flushCallBack;
private IMemTable memTable;
public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String processorName,
- Callback<IMemTable> callBack) {
+ Consumer<IMemTable> callBack) {
this.tsFileIoWriter = writer;
this.processorName = processorName;
this.flushCallBack = callBack;
@@ -135,7 +135,7 @@ public class MemTableFlushTaskV2 {
LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
tsFileIoWriter.makeMetadataVisible();
- flushCallBack.call(memTable);
+ flushCallBack.accept(memTable);
});