You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/27 07:35:08 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new efdb818  improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2
     new da98910  Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
efdb818 is described below

commit efdb8184b5ab7805c72f21f698c4f9b859962c4a
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 27 15:34:19 2019 +0800

    improve naming in MemtableFlushTaskV2 and close in TsFileResourceV2
---
 .../CopyOnReadLinkedList.java                      |  2 +-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  |  6 +-
 .../db/engine/filenodeV2/TsFileResourceV2.java     | 13 +---
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  2 +-
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 71 +++++++++++-----------
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  2 +-
 6 files changed, 42 insertions(+), 54 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
similarity index 97%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
index e6c8249..4be1116 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/CopyOnReadLinkedList.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.filenode;
+package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.util.ArrayList;
 import java.util.Iterator;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 7c093b8..0f1a46a 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
@@ -48,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -67,12 +65,12 @@ public class FileNodeProcessorV2 {
 
   private FileSchema fileSchema;
 
-  // includes sealed and unsealed sequnce tsfiles
+  // includes sealed and unsealed sequence tsfiles
   private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
-  // includes sealed and unsealed unsequnce tsfiles
+  // includes sealed and unsealed unSequnce tsfiles
   private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
   private UnsealedTsFileProcessorV2 workUnSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 2bfb04f..acbf8c2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -95,14 +95,6 @@ public class TsFileResourceV2 {
     this.readOnlyMemChunk = readOnlyMemChunk;
   }
 
-  public TsFileResourceV2(File file, Map<String, Long> startTimeMap, Map<String, Long> endTimeMap) {
-    this.file = file;
-    this.startTimeMap = startTimeMap;
-    this.endTimeMap = endTimeMap;
-    this.closed = true;
-  }
-
-
   public void serialize() throws IOException {
     try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
       ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
@@ -118,7 +110,6 @@ public class TsFileResourceV2 {
     }
   }
 
-
   public void deSerialize() throws IOException {
     try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
       int size = ReadWriteIOUtils.readInt(inputStream);
@@ -191,8 +182,8 @@ public class TsFileResourceV2 {
     return closed;
   }
 
-  public void setClosed(boolean closed) {
-    this.closed = closed;
+  public void close() {
+    closed = true;
     processor = null;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 2464821..c15fac1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -389,7 +389,7 @@ public class UnsealedTsFileProcessorV2 {
   }
 
   public void close() throws IOException {
-    tsFileResource.setClosed(true);
+    tsFileResource.close();
     MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName());
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index abee737..f933939 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,7 +15,6 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -44,15 +43,15 @@ public class MemTableFlushTaskV2 {
   private NativeRestorableIOWriter tsFileIoWriter;
 
   private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
-  private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
+  private ConcurrentLinkedQueue encodingTaskQueue = new ConcurrentLinkedQueue();
   private String storageGroup;
 
   private Consumer<IMemTable> flushCallBack;
   private IMemTable memTable;
   private FileSchema fileSchema;
 
-  private volatile boolean memoryFlushNoMoreTask = false;
-  private volatile boolean ioFlushTaskCanStop = false;
+  private volatile boolean noMoreEncodingTask = false;
+  private volatile boolean noMoreIOTask = false;
 
   public MemTableFlushTaskV2(IMemTable memTable, FileSchema fileSchema, NativeRestorableIOWriter writer, String storageGroup,
       Consumer<IMemTable> callBack) {
@@ -61,8 +60,8 @@ public class MemTableFlushTaskV2 {
     this.tsFileIoWriter = writer;
     this.storageGroup = storageGroup;
     this.flushCallBack = callBack;
-    subTaskPoolManager.submit(encodingTask);
-    this.ioFlushTaskFuture = subTaskPoolManager.submit(ioFlushTask);
+    subTaskPoolManager.submit(EncodingTask);
+    this.ioFlushTaskFuture = subTaskPoolManager.submit(IOTask);
     LOGGER.info("flush task of Storage group {} memtable {} is created ",
         storageGroup, memTable.getVersion());
   }
@@ -74,7 +73,7 @@ public class MemTableFlushTaskV2 {
   public void flushMemTable() {
     long sortTime = 0;
     for (String deviceId : memTable.getMemTableMap().keySet()) {
-      memoryTaskQueue.add(deviceId);
+      encodingTaskQueue.add(deviceId);
       int seriesNumber = memTable.getMemTableMap().get(deviceId).size();
       for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
         long startTime = System.currentTimeMillis();
@@ -83,11 +82,11 @@ public class MemTableFlushTaskV2 {
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
         DeduplicatedSortedData sortedTimeValuePairs = series.getDeduplicatedSortedData();
         sortTime += System.currentTimeMillis() - startTime;
-        memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+        encodingTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
       }
-      memoryTaskQueue.add(new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion()));
+      encodingTaskQueue.add(new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion()));
     }
-    memoryFlushNoMoreTask = true;
+    noMoreEncodingTask = true;
     LOGGER.info(
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
         storageGroup, memTable.getVersion(), sortTime);
@@ -103,27 +102,27 @@ public class MemTableFlushTaskV2 {
   }
 
 
-  private Runnable encodingTask = new Runnable() {
+  private Runnable EncodingTask = new Runnable() {
     @Override
     public void run() {
       try {
         long memSerializeTime = 0;
-        boolean returnWhenNoTask = false;
-        LOGGER.info("Storage group {} memtable {}, starts to serialize data into mem.", storageGroup,
+        boolean noMoreMessages = false;
+        LOGGER.info("Storage group {} memtable {}, starts to encoding data.", storageGroup,
             memTable.getVersion());
         while (true) {
-          if (memoryFlushNoMoreTask) {
-            returnWhenNoTask = true;
+          if (noMoreEncodingTask) {
+            noMoreMessages = true;
           }
-          Object task = memoryTaskQueue.poll();
+          Object task = encodingTaskQueue.poll();
           if (task == null) {
-            if (returnWhenNoTask) {
+            if (noMoreMessages) {
               break;
             }
             try {
               Thread.sleep(10);
             } catch (InterruptedException e) {
-              LOGGER.error("Storage group {} memtable {}, io flush task is interrupted.",
+              LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.",
                   storageGroup, memTable.getVersion(), e);
             }
           } else {
@@ -133,16 +132,16 @@ public class MemTableFlushTaskV2 {
               ioTaskQueue.add(task);
             } else {
               long starTime = System.currentTimeMillis();
-              Pair<DeduplicatedSortedData, MeasurementSchema> memorySerializeTask = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
-              ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
-              IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+              Pair<DeduplicatedSortedData, MeasurementSchema> encodingMessage = (Pair<DeduplicatedSortedData, MeasurementSchema>) task;
+              ChunkBuffer chunkBuffer = new ChunkBuffer(encodingMessage.right);
+              IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
                   PAGE_SIZE_THRESHOLD);
               try {
-                writeOneSeries(memorySerializeTask.left, seriesWriter,
-                    memorySerializeTask.right.getType());
+                writeOneSeries(encodingMessage.left, seriesWriter,
+                    encodingMessage.right.getType());
                 ioTaskQueue.add(seriesWriter);
               } catch (IOException e) {
-                LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
+                LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
                     memTable.getVersion(), e);
                 throw new RuntimeException(e);
               }
@@ -150,8 +149,8 @@ public class MemTableFlushTaskV2 {
             }
           }
         }
-        ioFlushTaskCanStop = true;
-        LOGGER.info("Storage group {}, flushing memtable {} into disk: serialize data into mem cost "
+        noMoreIOTask = true;
+        LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
                 + "{} ms.",
             storageGroup, memTable.getVersion(), memSerializeTime);
       } catch (RuntimeException e) {
@@ -163,7 +162,7 @@ public class MemTableFlushTaskV2 {
 
   //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
   // rather than per each memtable.
-  private Runnable ioFlushTask = new Runnable() {
+  private Runnable IOTask = new Runnable() {
     @Override
     public void run() {
       try {
@@ -171,11 +170,11 @@ public class MemTableFlushTaskV2 {
         boolean returnWhenNoTask = false;
         LOGGER.info("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
         while (true) {
-          if (ioFlushTaskCanStop) {
+          if (noMoreIOTask) {
             returnWhenNoTask = true;
           }
-          Object ioTask = ioTaskQueue.poll();
-          if (ioTask == null) {
+          Object ioMessage = ioTaskQueue.poll();
+          if (ioMessage == null) {
             if (returnWhenNoTask) {
               break;
             }
@@ -188,12 +187,12 @@ public class MemTableFlushTaskV2 {
           } else {
             long starTime = System.currentTimeMillis();
             try {
-              if (ioTask instanceof String) {
-                tsFileIoWriter.startChunkGroup((String) ioTask);
-              } else if (ioTask instanceof IChunkWriter) {
-                ((IChunkWriter) ioTask).writeToFileWriter(tsFileIoWriter);
+              if (ioMessage instanceof String) {
+                tsFileIoWriter.startChunkGroup((String) ioMessage);
+              } else if (ioMessage instanceof IChunkWriter) {
+                ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
               } else {
-                ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioTask;
+                ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
                 tsFileIoWriter.endChunkGroup(endGroupTask.version);
                 endGroupTask.finished = true;
               }
@@ -208,7 +207,7 @@ public class MemTableFlushTaskV2 {
         LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
             storageGroup, ioTime);
       } catch (RuntimeException e) {
-        LOGGER.error("ioflush thread is dead", e);
+        LOGGER.error("io thread is dead", e);
       }
     }
   };
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index ce00700..6d7619c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -157,7 +157,7 @@ public class UnsealedTsFileProcessorV2Test {
               String deviceId = startTime.getKey();
               resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
             }
-            resource.setClosed(true);
+            resource.close(true);
           }
         }, ()->true);