You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 08:33:18 UTC

[incubator-iotdb] 01/03: remove FileNodeRestore file

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2d19bdbec97e2f02f324806b2a54574b9a603315
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 13:09:47 2019 +0800

    remove FileNodeRestore file
---
 .../iotdb/db/engine/UnsealedTsFileProcessorV2.java |  10 +-
 .../db/engine/filenode/FileNodeProcessor.java      |   2 +-
 .../filenodeV2/FileNodeProcessorStoreV2.java       | 173 ---------------------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 119 ++++----------
 .../apache/iotdb/db/engine/memtable/Callback.java  |  25 ---
 .../db/engine/memtable/MemTableFlushTaskV2.java    |   6 +-
 6 files changed, 37 insertions(+), 298 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index a089d0d..7111873 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -26,10 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.bufferwriteV2.FlushManager;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.memtable.Callback;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
@@ -73,7 +74,7 @@ public class UnsealedTsFileProcessorV2 {
 
   protected VersionController versionController;
 
-  private Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
+  private Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
 
   /**
    * sync this object in query() and asyncFlush()
@@ -81,7 +82,8 @@ public class UnsealedTsFileProcessorV2 {
   private final LinkedList<IMemTable> flushingMemTables = new LinkedList<>();
 
   public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
-      VersionController versionController, Callback<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
+      VersionController versionController,
+      Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
       throws IOException {
     this.storageGroupName = storageGroupName;
     this.fileSchema = fileSchema;
@@ -182,7 +184,7 @@ public class UnsealedTsFileProcessorV2 {
     writer = null;
 
     // remove this processor from Closing list in FileNodeProcessor
-    closeUnsealedTsFileProcessor.call(this);
+    closeUnsealedTsFileProcessor.accept(this);
 
     // delete the restore for this bufferwrite processor
     if (LOGGER.isInfoEnabled()) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 082063a..7bc28c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -202,7 +202,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 //    @Override
 //    public void act() {
 //      synchronized (fileNodeProcessorStore) {
-//        fileNodeProcessorStore.setLatestTimeMap(lastUpdateTimeMap);
+//        fileNodeProcessorStore.setLatestFlushTimeForEachDevice(lastUpdateTimeMap);
 //        addLastTimeToIntervalFile();
 //        fileNodeProcessorStore.setSequenceFileList(newFileNodes);
 //      }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
deleted file mode 100644
index d16006d..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.filenodeV2;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-/**
- * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
- * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite setCloseMark.
- * emptyTsFileResource and sequenceFileList are changed and stored by Overflow flushMetadata and
- * Overflow setCloseMark. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
- * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed and stored when
- * FileNodeProcessor's status changes from work to merge.
- */
-public class FileNodeProcessorStoreV2 implements Serializable {
-
-  private static final long serialVersionUID = -54525372941897565L;
-
-  private boolean isOverflowed;
-  private Map<String, Long> latestTimeMap;
-  private List<TsFileResourceV2> sequenceFileList;
-  private List<TsFileResourceV2> unSequenceFileList;
-  private int numOfMergeFile;
-  private FileNodeProcessorStatus fileNodeProcessorStatus;
-
-  /**
-   * Constructor of FileNodeProcessorStore.
-   *
-   * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
-   * @param latestTimeMap the timestamp of last data point of each device in this FileNode.
-   * @param sequenceFileList sequnce tsfiles in the FileNode.
-   * @param unSequenceFileList unsequnce tsfiles in the FileNode
-   * @param fileNodeProcessorStatus the status of the FileNode.
-   * @param numOfMergeFile the number of files already merged in one merge operation.
-   */
-  public FileNodeProcessorStoreV2(boolean isOverflowed, Map<String, Long> latestTimeMap,
-      List<TsFileResourceV2> sequenceFileList, List<TsFileResourceV2> unSequenceFileList,
-      FileNodeProcessorStatus fileNodeProcessorStatus,
-      int numOfMergeFile) {
-    this.isOverflowed = isOverflowed;
-    this.latestTimeMap = latestTimeMap;
-    this.sequenceFileList = sequenceFileList;
-    this.unSequenceFileList = unSequenceFileList;
-    this.fileNodeProcessorStatus = fileNodeProcessorStatus;
-    this.numOfMergeFile = numOfMergeFile;
-  }
-
-  public void serialize(OutputStream outputStream) throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
-    // latestTimeMap
-    ReadWriteIOUtils.write(latestTimeMap.size(), byteArrayOutputStream);
-    for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
-      ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
-      ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.sequenceFileList.size(), byteArrayOutputStream);
-    for (TsFileResourceV2 tsFileResource : this.sequenceFileList) {
-      tsFileResource.serialize(byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.unSequenceFileList.size(), byteArrayOutputStream);
-    for (TsFileResourceV2 tsFileResource : this.unSequenceFileList) {
-      tsFileResource.serialize(byteArrayOutputStream);
-    }
-    ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
-    ReadWriteIOUtils.write(this.fileNodeProcessorStatus.serialize(), byteArrayOutputStream);
-    // buffer array to outputstream
-    byteArrayOutputStream.writeTo(outputStream);
-  }
-
-  public static FileNodeProcessorStoreV2 deSerialize(InputStream inputStream) throws IOException {
-    boolean isOverflowed = ReadWriteIOUtils.readBool(inputStream);
-    Map<String, Long> lastUpdateTimeMap = new HashMap<>();
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < size; i++) {
-      String path = ReadWriteIOUtils.readString(inputStream);
-      long time = ReadWriteIOUtils.readLong(inputStream);
-      lastUpdateTimeMap.put(path, time);
-    }
-    size = ReadWriteIOUtils.readInt(inputStream);
-    List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      sequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
-    }
-    size = ReadWriteIOUtils.readInt(inputStream);
-    List<TsFileResourceV2> unsequenceFileList = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      unsequenceFileList.add(TsFileResourceV2.deSerialize(inputStream));
-    }
-    int numOfMergeFile = ReadWriteIOUtils.readInt(inputStream);
-    FileNodeProcessorStatus fileNodeProcessorStatus = FileNodeProcessorStatus
-        .deserialize(ReadWriteIOUtils.readShort(inputStream));
-
-    return new FileNodeProcessorStoreV2(isOverflowed, lastUpdateTimeMap,
-        sequenceFileList, unsequenceFileList, fileNodeProcessorStatus, numOfMergeFile);
-  }
-
-  public boolean isOverflowed() {
-    return isOverflowed;
-  }
-
-  public void setOverflowed(boolean isOverflowed) {
-    this.isOverflowed = isOverflowed;
-  }
-
-  public FileNodeProcessorStatus getFileNodeProcessorStatus() {
-    return fileNodeProcessorStatus;
-  }
-
-  public void setFileNodeProcessorStatus(FileNodeProcessorStatus fileNodeProcessorStatus) {
-    this.fileNodeProcessorStatus = fileNodeProcessorStatus;
-  }
-
-  public Map<String, Long> getLatestTimeMap() {
-    return new HashMap<>(latestTimeMap);
-  }
-
-  public void setLatestTimeMap(Map<String, Long> latestTimeMap) {
-    this.latestTimeMap = latestTimeMap;
-  }
-
-  public List<TsFileResourceV2> getSequenceFileList() {
-    return sequenceFileList;
-  }
-
-  public void setSequenceFileList(List<TsFileResourceV2> sequenceFileList) {
-    this.sequenceFileList = sequenceFileList;
-  }
-
-  public List<TsFileResourceV2> getUnSequenceFileList() {
-    return unSequenceFileList;
-  }
-
-  public int getNumOfMergeFile() {
-    return numOfMergeFile;
-  }
-
-  public void setNumOfMergeFile(int numOfMergeFile) {
-    this.numOfMergeFile = numOfMergeFile;
-  }
-
-  public void setUnSequenceFileList(
-      List<TsFileResourceV2> unSequenceFileList) {
-    this.unSequenceFileList = unSequenceFileList;
-  }
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index fee51dc..6dedfa8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -19,12 +19,11 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -34,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
 import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -67,19 +65,19 @@ public class FileNodeProcessorV2 {
   private FileSchema fileSchema;
 
   // includes sealed and unsealed sequnce tsfiles
-  private List<TsFileResourceV2> sequenceFileList;
+  private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   // includes sealed and unsealed unsequnce tsfiles
-  private List<TsFileResourceV2> unSequenceFileList;
+  private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   /**
    * device -> global latest timestamp of each device
    */
-  private Map<String, Long> latestTimeForEachDevice;
+  private Map<String, Long> latestTimeForEachDevice = new HashMap<>();
 
   /**
    * device -> largest timestamp of the latest memtable to be submitted to asyncFlush
@@ -92,14 +90,6 @@ public class FileNodeProcessorV2 {
 
   private VersionController versionController;
 
-  // TODO delete the file path
-  private String absoluteFileNodeRestoreFilePath;
-
-  private FileNodeProcessorStoreV2 fileNodeProcessorStore;
-
-  // TODO delete this lock
-  private final Object fileNodeRestoreLock = new Object();
-
   public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
       throws FileNodeProcessorException {
     this.storageGroupName = storageGroupName;
@@ -122,20 +112,9 @@ public class FileNodeProcessorV2 {
           "directory {}", storageGroupName, restoreFolder.getAbsolutePath());
     }
 
-    absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
-
-    try {
-      fileNodeProcessorStore = readStoreFromDiskOrCreate();
-    } catch (FileNodeProcessorException e) {
-      LOGGER.error("The fileNode processor {} encountered an error when recovering restore " +
-          "information.", storageGroupName);
-      throw new FileNodeProcessorException(e);
-    }
+    String absoluteFileNodeRestoreFilePath = new File(restoreFolder, storageGroupName + RESTORE_FILE_SUFFIX).getAbsolutePath();
 
-    // TODO deep clone the lastupdate time, change the getSequenceFileList to V2
-    sequenceFileList = fileNodeProcessorStore.getSequenceFileList();
-    unSequenceFileList = fileNodeProcessorStore.getUnSequenceFileList();
-    latestTimeForEachDevice = fileNodeProcessorStore.getLatestTimeMap();
+    recovery();
 
     /**
      * version controller
@@ -150,6 +129,10 @@ public class FileNodeProcessorV2 {
     this.fileSchema = constructFileSchema(storageGroupName);
   }
 
+  // TODO: Jiang Tian
+  private void recovery(){
+  }
+
   private FileSchema constructFileSchema(String storageGroupName) {
     List<MeasurementSchema> columnSchemaList;
     columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
@@ -177,44 +160,6 @@ public class FileNodeProcessorV2 {
     }
   }
 
-
-  /**
-   * read file node store from disk or create a new one
-   */
-  private FileNodeProcessorStoreV2 readStoreFromDiskOrCreate() throws FileNodeProcessorException {
-
-    synchronized (fileNodeRestoreLock) {
-      File restoreFile = new File(absoluteFileNodeRestoreFilePath);
-      if (!restoreFile.exists() || restoreFile.length() == 0) {
-        return new FileNodeProcessorStoreV2(false, new HashMap<>(),
-            new ArrayList<>(), new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
-      }
-      try (FileInputStream inputStream = new FileInputStream(absoluteFileNodeRestoreFilePath)) {
-        return FileNodeProcessorStoreV2.deSerialize(inputStream);
-      } catch (IOException e) {
-        LOGGER
-            .error("Failed to deserialize the FileNodeRestoreFile {}, {}",
-                absoluteFileNodeRestoreFilePath,
-                e);
-        throw new FileNodeProcessorException(e);
-      }
-    }
-  }
-
-  private void writeStoreToDisk(FileNodeProcessorStoreV2 fileNodeProcessorStore)
-      throws FileNodeProcessorException {
-
-    synchronized (fileNodeRestoreLock) {
-      try (FileOutputStream fileOutputStream = new FileOutputStream(absoluteFileNodeRestoreFilePath)) {
-        fileNodeProcessorStore.serialize(fileOutputStream);
-        LOGGER.debug("The filenode processor {} writes restore information to the restore file",
-            storageGroupName);
-      } catch (IOException e) {
-        throw new FileNodeProcessorException(e);
-      }
-    }
-  }
-
   public boolean insert(TSRecord tsRecord) {
     lock.writeLock().lock();
     boolean result;
@@ -247,16 +192,18 @@ public class FileNodeProcessorV2 {
     if (unsealedTsFileProcessor == null) {
       if (sequence) {
         String baseDir = directories.getNextFolderForTsfile();
-        String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
-        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
-            fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+        String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+            new File(filePath),
+            fileSchema, versionController, this::closeUnsealedTsFileProcessor);
         sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
       } else {
         // TODO check if the disk is full
         String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
-        String filePath = Paths.get(baseDir, storageGroupName, tsRecord.time + "").toString();
-        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
-            fileSchema, versionController, this::closeUnsealedTsFileProcessorCallBack);
+        String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "").toString();
+        unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+            new File(filePath),
+            fileSchema, versionController, this::closeUnsealedTsFileProcessor);
         unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
       }
     }
@@ -356,29 +303,17 @@ public class FileNodeProcessorV2 {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and write method.
-  private void closeUnsealedTsFileProcessorCallBack(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
+  private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
     closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
-    synchronized (fileNodeProcessorStore) {
-      fileNodeProcessorStore.setLatestTimeMap(latestTimeForEachDevice);
-
-      if (!sequenceFileList.isEmpty()) {
-        // end time with one start time
-        Map<String, Long> endTimeMap = new HashMap<>();
-        TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
-        synchronized (resource) {
-          for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-            String deviceId = startTime.getKey();
-            endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
-          }
-          resource.setEndTimeMap(endTimeMap);
-        }
-      }
-      fileNodeProcessorStore.setSequenceFileList(sequenceFileList);
-      try {
-        writeStoreToDisk(fileNodeProcessorStore);
-      } catch (FileNodeProcessorException e) {
-        LOGGER.error("write FileNodeStore info error, because {}", e.getMessage(), e);
+    // end time with one start time
+    Map<String, Long> endTimeMap = new HashMap<>();
+    TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
+    synchronized (resource) {
+      for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+        String deviceId = startTime.getKey();
+        endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
       }
+      resource.setEndTimeMap(endTimeMap);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
deleted file mode 100644
index 1706203..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/Callback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-public interface Callback<T> {
-
-  void call(T object);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index a5e229a..621b5fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -43,11 +43,11 @@ public class MemTableFlushTaskV2 {
   private boolean stop = false;
   private String processorName;
 
-  private Callback<IMemTable> flushCallBack;
+  private Consumer<IMemTable> flushCallBack;
   private IMemTable memTable;
 
   public MemTableFlushTaskV2(NativeRestorableIOWriter writer, String processorName,
-      Callback<IMemTable> callBack) {
+      Consumer<IMemTable> callBack) {
     this.tsFileIoWriter = writer;
     this.processorName = processorName;
     this.flushCallBack = callBack;
@@ -135,7 +135,7 @@ public class MemTableFlushTaskV2 {
     LOGGER.info("Processor {} return back a memtable to MemTablePool", processorName);
 
     tsFileIoWriter.makeMetadataVisible();
-    flushCallBack.call(memTable);
+    flushCallBack.accept(memTable);
   });