You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/30 10:28:10 UTC

[incubator-iotdb] branch feature_async_close_tsfile_handle_full_disk_situation created (now 091fc2f)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch feature_async_close_tsfile_handle_full_disk_situation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 091fc2f  replace flush task runnable with a callable function; and reject future writes if there is no disk spaces any more...

This branch includes the following new commits:

     new 091fc2f  replace flush task runnable with a callable function; and reject future writes if there is no disk spaces any more...

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: replace flush task runnable with a callable function; and reject future writes if there is no disk spaces any more...

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile_handle_full_disk_situation
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 091fc2f47f1f20f9af58732a29611afaa039369b
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sun Jun 30 18:26:15 2019 +0800

    replace flush task runnable with a callable function; and reject future writes if there is no disk spaces any more...
---
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 29 ++++++++-
 .../iotdb/db/engine/filenodeV2/FlushManager.java   | 12 +++-
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 42 ++++++------
 .../db/engine/memtable/MemTableFlushTaskV2.java    | 74 +++++++++++++---------
 .../iotdb/db/engine/memtable/MemTablePool.java     |  2 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |  2 +-
 .../iotdb/db/engine/memtable/MemTablePoolTest.java |  2 +-
 7 files changed, 105 insertions(+), 58 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 233620c..ae58ee7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -56,6 +56,11 @@ public class FileNodeManagerV2 implements IService {
       .getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
+  /*
+   * whether reject all writes (insert, update, delete)
+   */
+  private boolean rejectWrite = false;
+
   /**
    * a folder (system/info/ by default) that persist FileNodeProcessorStore classes. Ends with
    * File.separator Each FileNodeManager will have a subfolder.
@@ -79,6 +84,14 @@ public class FileNodeManagerV2 implements IService {
    */
   private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
 
+  public boolean isRejectWrite() {
+    return rejectWrite;
+  }
+
+  public void setRejectWrite(boolean rejectWrite) {
+    this.rejectWrite = rejectWrite;
+  }
+
   private enum FileNodeManagerStatus {
     NONE, MERGE, CLOSE
   }
@@ -168,7 +181,9 @@ public class FileNodeManagerV2 implements IService {
    * @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite
    */
   public boolean insert(InsertPlan insertPlan) throws FileNodeManagerException {
-
+    if (rejectWrite) {
+      return false;
+    }
     FileNodeProcessorV2 fileNodeProcessor;
     try {
       fileNodeProcessor = getProcessor(insertPlan.getDeviceId());
@@ -200,22 +215,30 @@ public class FileNodeManagerV2 implements IService {
   /**
    * update data.
    */
-  public void update(String deviceId, String measurementId, long startTime, long endTime,
+  public boolean update(String deviceId, String measurementId, long startTime, long endTime,
       TSDataType type, String v) {
+    if (rejectWrite) {
+      return false;
+    }
     // TODO
+    return false;
   }
 
   /**
    * delete data.
    */
-  public void delete(String deviceId, String measurementId, long timestamp)
+  public boolean delete(String deviceId, String measurementId, long timestamp)
       throws FileNodeManagerException {
+    if (rejectWrite) {
+      return false;
+    }
     FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId);
     try {
       fileNodeProcessor.delete(deviceId, measurementId, timestamp);
     } catch (IOException e) {
       throw new FileNodeManagerException(e);
     }
+    return true;
   }
 
   private void delete(String processorName,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
index 002f809..b878edc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FlushManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
@@ -34,14 +35,15 @@ public class FlushManager {
 
   private FlushPoolManager flushPool = FlushPoolManager.getInstance();
 
-  class FlushThread implements Runnable {
+  class FlushThread implements Callable<Boolean> {
 
     @Override
-    public void run() {
+    public Boolean call() {
       UnsealedTsFileProcessorV2 unsealedTsFileProcessor = unsealedTsFileProcessorQueue.poll();
       long startTime = System.currentTimeMillis();
+      boolean flushSuccessed = false;
       try {
-        unsealedTsFileProcessor.flushOneMemTable();
+        flushSuccessed = unsealedTsFileProcessor.flushOneMemTable();
       } catch (IOException e) {
         LOGGER.error("storage group {} flush one memtable meet error",
             unsealedTsFileProcessor.getStorageGroupName(), e);
@@ -51,6 +53,10 @@ public class FlushManager {
       LOGGER.info("storage group {} flush process consume {} ms",
           unsealedTsFileProcessor.getStorageGroupName(), System.currentTimeMillis() - startTime);
       registerUnsealedTsFileProcessor(unsealedTsFileProcessor);
+      if (!flushSuccessed) {
+        FileNodeManagerV2.getInstance().setRejectWrite(true);
+      }
+      return flushSuccessed;
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 57bafc7..16d675d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -23,13 +23,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.EmptyMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -41,11 +39,9 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.QueryUtils;
-import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -119,10 +115,10 @@ public class UnsealedTsFileProcessorV2 {
 
 //    long start1 = System.currentTimeMillis();
     if (workMemTable == null) {
-      // TODO change the impl of getEmptyMemTable to non-blocking
-      workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
+      // TODO change the impl of getAvailableMemTable to non-blocking
+      workMemTable = MemTablePool.getInstance().getAvailableMemTable(this);
 
-      // no empty memtable, return failure
+      // no available memtable, return failure
       if (workMemTable == null) {
         return false;
       }
@@ -184,6 +180,9 @@ public class UnsealedTsFileProcessorV2 {
 
 
   public boolean shouldFlush() {
+    if (workMemTable == null) {
+      return false;
+    }
     return workMemTable.memSize() > TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
   }
 
@@ -314,48 +313,55 @@ public class UnsealedTsFileProcessorV2 {
    * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
    * the flush manager pool
    */
-  void flushOneMemTable() throws IOException {
+  boolean flushOneMemTable() throws IOException {
     IMemTable memTableToFlush;
     memTableToFlush = flushingMemTables.getFirst();
 
     LOGGER.info("storage group {} start to flush a memtable in a flush thread", storageGroupName);
 
-    // null memtable only appears when calling asyncClose()
+    boolean flushSuccessed = false;
+    //if the memtable is not an EmptyMemTable (i.e., the memtable is actually a memtable).
     if (memTableToFlush.isManagedByMemPool()) {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer,
           storageGroupName,
           this::releaseFlushedMemTableCallback);
-      flushTask.flushMemTable();
+      flushSuccessed = flushTask.flushMemTable();
+      if (flushSuccessed) {
 //      long start = System.currentTimeMillis();
-      MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
+        MemTablePool.getInstance().putBack(memTableToFlush, storageGroupName);
 //      long elapse = System.currentTimeMillis() - start;
 //      if (elapse > 1000) {
 //        LOGGER.info("release a memtable cost: {}", elapse);
 //      }
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        getLogNode().notifyEndFlush();
+        if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+          getLogNode().notifyEndFlush();
+        }
       }
       LOGGER.info("flush a memtable has finished");
     } else {
+      // the memtable is an EmptyMemTable. it is a signal for indicating asyncClose()
       LOGGER.info(
           "release an empty memtable from flushing memtable list, which is submitted in force flush");
       releaseFlushedMemTableCallback(memTableToFlush);
+      flushSuccessed = true;
     }
 
-    // for sync flush
+    // for notifying syncFlush()
     synchronized (memTableToFlush) {
       memTableToFlush.notify();
     }
 
     if (shouldClose && flushingMemTables.isEmpty()) {
-      endFile();
-
-      // for sync close
+      if (flushSuccessed) {
+        //if !flushSuccessed, then the file may be broken, we do not seal the file.
+        endFile();
+      }
+      // for notifying syncClose()
       synchronized (flushingMemTables) {
         flushingMemTables.notify();
       }
     }
-
+    return flushSuccessed;
   }
 
   private void endFile() throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index cce9093..d37cf77 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -15,6 +15,7 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -42,7 +43,7 @@ public class MemTableFlushTaskV2 {
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
   private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
       .getInstance();
-  private Future ioFlushTaskFuture;
+  private Future<Boolean> ioFlushTaskFuture;
   private NativeRestorableIOWriter tsFileIoWriter;
 
   private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
@@ -72,8 +73,9 @@ public class MemTableFlushTaskV2 {
 
   /**
    * the function for flushing memtable.
+   * this is a synchronized function.
    */
-  public void flushMemTable() {
+  public boolean flushMemTable() {
     long sortTime = 0;
     for (String deviceId : memTable.getMemTableMap().keySet()) {
       encodingTaskQueue.add(deviceId);
@@ -94,14 +96,18 @@ public class MemTableFlushTaskV2 {
         "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
         storageGroup, memTable.getVersion(), sortTime);
 
+    Boolean success = false;
     try {
-      ioFlushTaskFuture.get();
+      success = ioFlushTaskFuture.get();
     } catch (InterruptedException | ExecutionException e) {
       LOGGER.error("Waiting for IO flush task end meets error", e);
     }
-
     LOGGER.info("Storage group {} memtable {} flushing a memtable finished!", storageGroup, memTable);
-    flushCallBack.accept(memTable);
+    if (success) {
+      //only if successed, we use the callback to release the memtable.
+      flushCallBack.accept(memTable);
+    }
+    return success;
   }
 
 
@@ -131,11 +137,16 @@ public class MemTableFlushTaskV2 {
             }
           } else {
             if (task instanceof String) {
+              // the task indicates that a new Chunk Group begins, the value of the task is the deviceId.
+              //so, we just forward the task to the ioTaskQueue
               currDevice = (String) task;
               ioTaskQueue.add(task);
             } else if (task instanceof ChunkGroupIoTask) {
+              //the task indicates that all Chunks in the Chunk Group haven been submitted for encoding.
+              //so, we just forward the task to the  ioTaskQueue
               ioTaskQueue.add(task);
             } else {
+              //the task is for encoding and writing a Chunk into memory buffer.
               long starTime = System.currentTimeMillis();
               Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
               ChunkBuffer chunkBuffer;
@@ -150,6 +161,7 @@ public class MemTableFlushTaskV2 {
               try {
                 writeOneSeries(encodingMessage.left, seriesWriter,
                     encodingMessage.right.getType());
+                //then we submit a task for flushing the memory buffer to the disk
                 ioTaskQueue.add(seriesWriter);
               } catch (IOException e) {
                 LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
@@ -171,11 +183,9 @@ public class MemTableFlushTaskV2 {
   };
 
 
-  //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
-  // rather than per each memtable.
-  private Runnable IOTask = new Runnable() {
+  private Callable<Boolean> IOTask = new Callable<Boolean>() {
     @Override
-    public void run() {
+    public Boolean call() {
       try {
         long ioTime = 0;
         boolean returnWhenNoTask = false;
@@ -197,35 +207,37 @@ public class MemTableFlushTaskV2 {
             }
           } else {
             long starTime = System.currentTimeMillis();
-            try {
-              if (ioMessage instanceof String) {
-                tsFileIoWriter.startChunkGroup((String) ioMessage);
-              } else if (ioMessage instanceof IChunkWriter) {
-                if (IoTDBDescriptor.getInstance().getConfig().isChunkBufferPoolEnable()) {//chunk buffer pool enable
-                  ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage;
-                  writer.writeToFileWriter(tsFileIoWriter);
-                  ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer());
-                } else {
-                  ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
-                }
+            if (ioMessage instanceof String) {
+              //a new Chunk group begins
+              tsFileIoWriter.startChunkGroup((String) ioMessage);
+            } else if (ioMessage instanceof IChunkWriter) {
+              //writing a memory chunk buffer to the disk
+              if (IoTDBDescriptor.getInstance().getConfig()
+                  .isChunkBufferPoolEnable()) {//chunk buffer pool enable
+                ChunkWriterImpl writer = (ChunkWriterImpl) ioMessage;
+                writer.writeToFileWriter(tsFileIoWriter);
+                ChunkBufferPool.getInstance().putBack(writer.getChunkBuffer());
               } else {
-                ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
-                tsFileIoWriter.endChunkGroup(endGroupTask.version);
-                endGroupTask.finished = true;
+                ((IChunkWriter) ioMessage).writeToFileWriter(tsFileIoWriter);
               }
-            } catch (IOException e) {
-              LOGGER.error("Storage group {} memtable {}, io error.", storageGroup,
-                  memTable.getVersion(), e);
-              throw new RuntimeException(e);
+            } else {
+              //finishing a chunk group.
+              ChunkGroupIoTask endGroupTask = (ChunkGroupIoTask) ioMessage;
+              tsFileIoWriter.endChunkGroup(endGroupTask.version);
+              endGroupTask.finished = true;
             }
             ioTime += System.currentTimeMillis() - starTime;
           }
         }
-        LOGGER.info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
-            storageGroup, ioTime);
-      } catch (RuntimeException e) {
-        LOGGER.error("io thread is dead", e);
+        LOGGER
+            .info("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
+                storageGroup, ioTime);
+      } catch (Exception e) {
+        LOGGER.error("flushing Storage group {} memtable version {} failed.", storageGroup,
+            memTable.getVersion(), e);
+        return false;
       }
+      return true;
     }
   };
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 75bb1f1..407d8b4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -45,7 +45,7 @@ public class MemTablePool {
   private MemTablePool() {
   }
 
-  public IMemTable getEmptyMemTable(Object applier) {
+  public IMemTable getAvailableMemTable(Object applier) {
     synchronized (availableMemTables) {
       if (availableMemTables.isEmpty() && size < capacity) {
         size++;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index b64d5e7..36890bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -112,7 +112,7 @@ public class TsFileRecoverPerformer {
     // flush logs
     MemTableFlushTaskV2 tableFlushTask = new MemTableFlushTaskV2(recoverMemTable, fileSchema, restorableTsFileIOWriter,
         logNodePrefix, (a) -> {});
-    tableFlushTask.flushMemTable();
+    boolean success = tableFlushTask.flushMemTable();
 
     // close file
     try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
index 8c8396a..d315d19 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTablePoolTest.java
@@ -44,7 +44,7 @@ public class MemTablePoolTest {
   public void testGetAndRelease() {
     long time = System.currentTimeMillis();
     for (int i = 0; i < 10; i++) {
-      IMemTable memTable = MemTablePool.getInstance().getEmptyMemTable("test case");
+      IMemTable memTable = MemTablePool.getInstance().getAvailableMemTable("test case");
       memTables.add(memTable);
     }
     time -= System.currentTimeMillis();