You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/30 01:25:13 UTC

[iotdb] branch serial_test created (now f94b072)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch serial_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f94b072  init

This branch includes the following new commits:

     new f94b072  init

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: init

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch serial_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f94b072c31c88bea8500976b23770c241b6fb500
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Sat Jan 30 09:24:41 2021 +0800

    init
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 248 +++++----------------
 .../writelog/recover/TsFileRecoverPerformer.java   |   3 -
 2 files changed, 61 insertions(+), 190 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index c643f59..1cb13b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,21 +19,15 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
-import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -44,25 +38,12 @@ import org.slf4j.LoggerFactory;
 public class MemTableFlushTask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
-  private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
-      .getInstance();
-  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private final Future<?> encodingTaskFuture;
-  private final Future<?> ioTaskFuture;
-  private RestorableTsFileIOWriter writer;
-
-  private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
-  private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
-      && SystemInfo.getInstance().isEncodingFasterThanIo())
-          ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
-          : new LinkedBlockingQueue<>();
-
-  private String storageGroup;
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final RestorableTsFileIOWriter writer;
 
-  private IMemTable memTable;
+  private final String storageGroup;
 
-  private volatile long memSerializeTime = 0L;
-  private volatile long ioTime = 0L;
+  private final IMemTable memTable;
 
   /**
    * @param memTable the memTable to flush
@@ -74,8 +55,6 @@ public class MemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
-    this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
-    this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug("flush task of Storage group {} memtable is created, flushing to file {}.",
               storageGroup, writer.getFile().getName());
   }
@@ -83,8 +62,7 @@ public class MemTableFlushTask {
   /**
    * the function for flushing memtable.
    */
-  public void syncFlushMemTable()
-      throws ExecutionException, InterruptedException {
+  public void syncFlushMemTable() throws ExecutionException, IOException {
     LOGGER.info("The memTable size of SG {} is {}, the avg series points num in chunk is {} ",
         storageGroup,
         memTable.memSize(),
@@ -98,10 +76,12 @@ public class MemTableFlushTask {
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
+    long encodingTime = 0;
+    long ioTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
+      this.writer.startChunkGroup(memTableEntry.getKey());
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
@@ -109,25 +89,36 @@ public class MemTableFlushTask {
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
-        sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.put(new Pair<>(tvList, desc));
+        long encodingStartTime = System.currentTimeMillis();
+        sortTime += encodingStartTime - startTime;
+        IChunkWriter seriesWriter = new ChunkWriterImpl(desc);
+        writeOneSeries(tvList, seriesWriter, desc.getType());
+        seriesWriter.sealCurrentPage();
+        seriesWriter.clearPageWriter();
+        long ioStartTime = System.currentTimeMillis();
+        encodingTime += ioStartTime - encodingStartTime;
+        seriesWriter.writeToFileWriter(this.writer);
+        ioTime += System.currentTimeMillis() - ioStartTime;
       }
-
-      encodingTaskQueue.put(new EndChunkGroupIoTask());
+      long ioStartTime = System.currentTimeMillis();
+      this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+      this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+      this.writer.endChunkGroup();
+      ioTime += System.currentTimeMillis() - ioStartTime;
     }
-    encodingTaskQueue.put(new TaskEnd());
-    LOGGER.debug(
+
+    LOGGER.info(
         "Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
         storageGroup, writer.getFile().getName(), sortTime);
 
-    try {
-      encodingTaskFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      ioTaskFuture.cancel(true);
-      throw e;
-    }
+    LOGGER.info(
+        "Storage group {} memtable flushing into file {}: data encoding time cost {} ms.",
+        storageGroup, writer.getFile().getName(), encodingTime);
+
+    LOGGER.info(
+        "Storage group {} memtable flushing into file {}: disk io time cost {} ms.",
+        storageGroup, writer.getFile().getName(), ioTime);
 
-    ioTaskFuture.get();
 
     try {
       writer.writePlanIndices();
@@ -139,7 +130,7 @@ public class MemTableFlushTask {
       if (estimatedTemporaryMemSize != 0) {
         SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
       }
-      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
+      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= encodingTime);
     }
 
     LOGGER.info(
@@ -147,157 +138,40 @@ public class MemTableFlushTask {
         storageGroup, memTable, System.currentTimeMillis() - start);
   }
 
-  private Runnable encodingTask = new Runnable() {
-    private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
-        TSDataType dataType) {
-      for (int i = 0; i < tvPairs.size(); i++) {
-        long time = tvPairs.getTime(i);
+  private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
+      TSDataType dataType) {
+    for (int i = 0; i < tvPairs.size(); i++) {
+      long time = tvPairs.getTime(i);
 
-        // skip duplicated data
-        if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
-          continue;
-        }
-
-        switch (dataType) {
-          case BOOLEAN:
-            seriesWriterImpl.write(time, tvPairs.getBoolean(i));
-            break;
-          case INT32:
-            seriesWriterImpl.write(time, tvPairs.getInt(i));
-            break;
-          case INT64:
-            seriesWriterImpl.write(time, tvPairs.getLong(i));
-            break;
-          case FLOAT:
-            seriesWriterImpl.write(time, tvPairs.getFloat(i));
-            break;
-          case DOUBLE:
-            seriesWriterImpl.write(time, tvPairs.getDouble(i));
-            break;
-          case TEXT:
-            seriesWriterImpl.write(time, tvPairs.getBinary(i));
-            break;
-          default:
-            LOGGER.error("Storage group {} does not support data type: {}", storageGroup,
-                dataType);
-            break;
-        }
+      // skip duplicated data
+      if ((i + 1 < tvPairs.size() && (time == tvPairs.getTime(i + 1)))) {
+        continue;
       }
-    }
 
-    @SuppressWarnings("squid:S135")
-    @Override
-    public void run() {
-      LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
-          storageGroup, writer.getFile().getName());
-      while (true) {
-
-        Object task = null;
-        try {
-          task = encodingTaskQueue.take();
-        } catch (InterruptedException e1) {
-          LOGGER.error("Take task into ioTaskQueue Interrupted");
-          Thread.currentThread().interrupt();
+      switch (dataType) {
+        case BOOLEAN:
+          seriesWriterImpl.write(time, tvPairs.getBoolean(i));
           break;
-        }
-        if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
-          try {
-            ioTaskQueue.put(task);
-          } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-            LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
-                storageGroup, writer.getFile().getName(), e);
-            // generally it is because the thread pool is shutdown so the task should be aborted
-            break;
-          }
-        } else if (task instanceof TaskEnd) {
+        case INT32:
+          seriesWriterImpl.write(time, tvPairs.getInt(i));
           break;
-        } else {
-          long starTime = System.currentTimeMillis();
-          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-          IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-          writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
-          seriesWriter.sealCurrentPage();
-          seriesWriter.clearPageWriter();
-          try {
-            ioTaskQueue.put(seriesWriter);
-          } catch (InterruptedException e) {
-            LOGGER.error("Put task into ioTaskQueue Interrupted");
-            Thread.currentThread().interrupt();
-          }
-          memSerializeTime += System.currentTimeMillis() - starTime;
-        }
-      }
-      try {
-        ioTaskQueue.put(new TaskEnd());
-      } catch (InterruptedException e) {
-        LOGGER.error("Put task into ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-      }
-      
-      LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
-          + "{} ms.",
-          storageGroup, writer.getFile().getName(), memSerializeTime);
-    }
-  };
-
-  @SuppressWarnings("squid:S135")
-  private Runnable ioTask = () -> {
-    LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
-        storageGroup, writer.getFile().getName());
-    while (true) {
-      Object ioMessage = null;
-      try {
-        ioMessage = ioTaskQueue.take();
-      } catch (InterruptedException e1) {
-        LOGGER.error("take task from ioTaskQueue Interrupted");
-        Thread.currentThread().interrupt();
-        break;
-      }
-      long starTime = System.currentTimeMillis();
-      try {
-        if (ioMessage instanceof StartFlushGroupIOTask) {
-          this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
-        } else if (ioMessage instanceof TaskEnd) {
+        case INT64:
+          seriesWriterImpl.write(time, tvPairs.getLong(i));
+          break;
+        case FLOAT:
+          seriesWriterImpl.write(time, tvPairs.getFloat(i));
+          break;
+        case DOUBLE:
+          seriesWriterImpl.write(time, tvPairs.getDouble(i));
+          break;
+        case TEXT:
+          seriesWriterImpl.write(time, tvPairs.getBinary(i));
+          break;
+        default:
+          LOGGER.error("Storage group {} does not support data type: {}", storageGroup,
+              dataType);
           break;
-        } else if (ioMessage instanceof IChunkWriter) {
-          ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-          chunkWriter.writeToFileWriter(this.writer);
-        } else {
-          this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-          this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-          this.writer.endChunkGroup();
-        }
-      } catch (IOException e) {
-        LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
-            memTable, e);
-        throw new FlushRunTimeException(e);
       }
-      ioTime += System.currentTimeMillis() - starTime;
-    }
-    LOGGER.debug("flushing a memtable to file {} in storage group {}, io cost {}ms",
-            writer.getFile().getName(), storageGroup, ioTime);
-  };
-
-  static class TaskEnd {
-
-    TaskEnd() {
-
-    }
-  }
-
-  static class EndChunkGroupIoTask {
-
-    EndChunkGroupIoTask() {
-
-    }
-  }
-
-  static class StartFlushGroupIOTask {
-
-    private final String deviceId;
-
-    StartFlushGroupIOTask(String deviceId) {
-      this.deviceId = deviceId;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b2f2bcb..e46ddd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -222,9 +222,6 @@ public class TsFileRecoverPerformer {
       // into it
     } catch (IOException | ExecutionException e) {
       throw new StorageGroupProcessorException(e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new StorageGroupProcessorException(e);
     }
   }