You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/04 08:45:07 UTC

[iotdb] 02/02: correct

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch pipeline_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7d1be89c569000c33da67f8197f2255c3295816a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 4 16:44:34 2021 +0800

    correct
---
 .../main/java/org/apache/iotdb/SessionExample.java |  37 ++++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  12 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   | 165 ++++++++++++---------
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   5 +
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  12 +-
 5 files changed, 144 insertions(+), 87 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 5e97342..a8916cf 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,10 +44,10 @@ public class SessionExample {
 
     long MAX_ROW_NUM = Long.parseLong(args[0]);
     System.out.println("MAX_ROW_NUM: " + MAX_ROW_NUM);
-    insertTablet(MAX_ROW_NUM);
+    insertSeqTablet(MAX_ROW_NUM);
     session.close();
   }
-  private static void insertTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
+  private static void insertRandomTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -77,4 +77,37 @@ public class SessionExample {
       tablet.reset();
     }
   }
+
+  private static void insertSeqTablet(long MAX_ROW_NUM) throws IoTDBConnectionException, StatementExecutionException {
+    // The schema of measurements of one device
+    // only measurementId and data type in MeasurementSchema take effects in Tablet
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      schemaList.add(new MeasurementSchema("s" + i, TSDataType.DOUBLE));
+    }
+
+    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 1000);
+
+    //Method 1 to add tablet data
+    long timestamp = System.currentTimeMillis();
+
+    for (long row = 0; row < MAX_ROW_NUM; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 1000; s++) {
+        double value = new Random().nextDouble();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        session.insertTablet(tablet, true);
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    if (tablet.rowSize != 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 988469a..665f7e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -131,17 +131,17 @@ public class IoTDBConfig {
   /**
    * Memory allocated for the write process
    */
-  private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 10;
+  private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 6 / 10;
 
   /**
    * Memory allocated for the read process
    */
-  private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
+  private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() / 10;
 
   /**
    * Memory allocated for the mtree
    */
-  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
+  private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
   /**
    * Memory allocated for the read process besides cache
@@ -301,7 +301,7 @@ public class IoTDBConfig {
   /**
    * Is the write mem control for writing enable.
    */
-  private boolean enableMemControl = true;
+  private boolean enableMemControl = false;
 
   /**
    * Is the write ahead log enable.
@@ -343,12 +343,12 @@ public class IoTDBConfig {
   /**
    * When a memTable's size (in byte) exceeds this, the memtable is flushed to disk.
    */
-  private long memtableSizeThreshold = 1024 * 1024 * 1024L;
+  private long memtableSizeThreshold = 4 * 1024 * 1024 * 1024L;
 
   /**
    * When average series point number reaches this, flush the memtable to disk
    */
-  private int avgSeriesPointNumberThreshold = 100000;
+  private int avgSeriesPointNumberThreshold = 100000000;
 
   /**
    * Work when tsfile_manage_strategy is level_strategy. When merge point number reaches this, merge
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index b06e170..5f2e218 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -19,16 +19,19 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -43,18 +46,23 @@ public class MemTableFlushTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
   private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager
       .getInstance();
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
 
-  private final ConcurrentLinkedQueue<Object> ioTaskQueue = new ConcurrentLinkedQueue<>();
-  private final ConcurrentLinkedQueue<Object> encodingTaskQueue = new ConcurrentLinkedQueue<>();
+  private final LinkedBlockingQueue<Object> encodingTaskQueue = new LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<Object> ioTaskQueue = (config.isEnableMemControl()
+      && SystemInfo.getInstance().isEncodingFasterThanIo())
+      ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+      : new LinkedBlockingQueue<>();
+
   private String storageGroup;
 
   private IMemTable memTable;
 
-  private volatile boolean noMoreEncodingTask = false;
-  private volatile boolean noMoreIOTask = false;
+  private long memSerializeTime = 0L;
+  private long ioTime = 0L;
 
   /**
    * @param memTable the memTable to flush
@@ -81,12 +89,19 @@ public class MemTableFlushTask {
         storageGroup,
         memTable.memSize(),
         memTable.getTotalPointsNum() / memTable.getSeriesNumber());
+
+    long estimatedTemporaryMemSize = 0L;
+    if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) {
+      estimatedTemporaryMemSize = memTable.memSize() / memTable.getSeriesNumber()
+          * config.getIoTaskQueueSizeForFlushing();
+      SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+    }
     long start = System.currentTimeMillis();
     long sortTime = 0;
 
     //for map do not use get(key) to iteratate
     for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) {
-      encodingTaskQueue.add(new StartFlushGroupIOTask(memTableEntry.getKey()));
+      encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
       final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
@@ -95,13 +110,12 @@ public class MemTableFlushTask {
         MeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.add(new Pair<>(tvList, desc));
+        encodingTaskQueue.put(new Pair<>(tvList, desc));
       }
 
-      encodingTaskQueue.add(new EndChunkGroupIoTask());
+      encodingTaskQueue.put(new EndChunkGroupIoTask());
     }
-
-    noMoreEncodingTask = true;
+    encodingTaskQueue.put(new TaskEnd());
     LOGGER.info(
         "Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
         storageGroup, writer.getFile().getName(), sortTime);
@@ -109,8 +123,6 @@ public class MemTableFlushTask {
     try {
       encodingTaskFuture.get();
     } catch (InterruptedException | ExecutionException e) {
-      // avoid ioTask waiting forever
-      noMoreIOTask = true;
       ioTaskFuture.cancel(true);
       throw e;
     }
@@ -123,6 +135,13 @@ public class MemTableFlushTask {
       throw new ExecutionException(e);
     }
 
+    if (config.isEnableMemControl()) {
+      if (estimatedTemporaryMemSize != 0) {
+        SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+      }
+      SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime);
+    }
+
     LOGGER.info(
         "Storage group {} memtable {} flushing a memtable has finished! Time consumption: {}ms",
         storageGroup, memTable, System.currentTimeMillis() - start);
@@ -169,44 +188,53 @@ public class MemTableFlushTask {
     @SuppressWarnings("squid:S135")
     @Override
     public void run() {
-      long memSerializeTime = 0;
-      boolean noMoreMessages = false;
       LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
           storageGroup, writer.getFile().getName());
       while (true) {
-        if (noMoreEncodingTask) {
-          noMoreMessages = true;
+
+        Object task;
+        try {
+          task = encodingTaskQueue.take();
+        } catch (InterruptedException e1) {
+          LOGGER.error("Take task into ioTaskQueue Interrupted");
+          Thread.currentThread().interrupt();
+          break;
         }
-        Object task = encodingTaskQueue.poll();
-        if (task == null) {
-          if (noMoreMessages) {
-            break;
-          }
+        if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
           try {
-            TimeUnit.MILLISECONDS.sleep(10);
+            ioTaskQueue.put(task);
           } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
             LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
                 storageGroup, writer.getFile().getName(), e);
             // generally it is because the thread pool is shutdown so the task should be aborted
             break;
           }
+        } else if (task instanceof TaskEnd) {
+          break;
         } else {
-          if (task instanceof StartFlushGroupIOTask || task instanceof EndChunkGroupIoTask) {
-            ioTaskQueue.add(task);
-          } else {
-            long starTime = System.currentTimeMillis();
-            Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
-            IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
-            writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
-            seriesWriter.sealCurrentPage();
-            seriesWriter.clearPageWriter();
-            memSerializeTime += System.currentTimeMillis() - starTime;
-            ioTaskQueue.add(seriesWriter);
+          long starTime = System.currentTimeMillis();
+          Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
+          IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
+          writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
+          seriesWriter.sealCurrentPage();
+          seriesWriter.clearPageWriter();
+          try {
+            ioTaskQueue.put(seriesWriter);
+          } catch (InterruptedException e) {
+            LOGGER.error("Put task into ioTaskQueue Interrupted");
+            Thread.currentThread().interrupt();
           }
+          memSerializeTime += System.currentTimeMillis() - starTime;
         }
       }
-      noMoreIOTask = true;
-      LOGGER.info("Storage group {}, flushing memtable into file {}: Encoding data cost "
+      try {
+        ioTaskQueue.put(new TaskEnd());
+      } catch (InterruptedException e) {
+        LOGGER.error("Put task into ioTaskQueue Interrupted");
+        Thread.currentThread().interrupt();
+      }
+
+      LOGGER.info("Storage group {}, flushing memtable {} into disk: Encoding data cost "
               + "{} ms.",
           storageGroup, writer.getFile().getName(), memSerializeTime);
     }
@@ -214,52 +242,49 @@ public class MemTableFlushTask {
 
   @SuppressWarnings("squid:S135")
   private Runnable ioTask = () -> {
-    long ioTime = 0;
-    boolean returnWhenNoTask = false;
     LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
         storageGroup, writer.getFile().getName());
     while (true) {
-      if (noMoreIOTask) {
-        returnWhenNoTask = true;
+      Object ioMessage;
+      try {
+        ioMessage = ioTaskQueue.take();
+      } catch (InterruptedException e1) {
+        LOGGER.error("take task from ioTaskQueue Interrupted");
+        Thread.currentThread().interrupt();
+        break;
       }
-      Object ioMessage = ioTaskQueue.poll();
-      if (ioMessage == null) {
-        if (returnWhenNoTask) {
+      long starTime = System.currentTimeMillis();
+      try {
+        if (ioMessage instanceof StartFlushGroupIOTask) {
+          this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+        } else if (ioMessage instanceof TaskEnd) {
           break;
+        } else if (ioMessage instanceof IChunkWriter) {
+          ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
+          chunkWriter.writeToFileWriter(this.writer);
+        } else {
+          this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
+          this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
+          this.writer.endChunkGroup();
         }
-        try {
-          TimeUnit.MILLISECONDS.sleep(10);
-        } catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
-          LOGGER.error("Storage group {} memtable flushing to file {}, io task is interrupted.",
-              storageGroup, writer.getFile().getName());
-          // generally it is because the thread pool is shutdown so the task should be aborted
-          break;
-        }
-      } else {
-        long starTime = System.currentTimeMillis();
-        try {
-          if (ioMessage instanceof StartFlushGroupIOTask) {
-//            this.writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
-          } else if (ioMessage instanceof IChunkWriter) {
-            ChunkWriterImpl chunkWriter = (ChunkWriterImpl) ioMessage;
-//            chunkWriter.writeToFileWriter(this.writer);
-          } else {
-//            this.writer.setMinPlanIndex(memTable.getMinPlanIndex());
-//            this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex());
-            this.writer.endChunkGroup();
-          }
-        } catch (IOException e) {
-          LOGGER.error("Storage group {} memtable flushing to file {}, io task meets error.",
-              storageGroup, writer.getFile().getName(), e);
-          throw new FlushRunTimeException(e);
-        }
-        ioTime += System.currentTimeMillis() - starTime;
+      } catch (IOException e) {
+        LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
+            memTable, e);
+        throw new FlushRunTimeException(e);
       }
+      ioTime += System.currentTimeMillis() - starTime;
     }
     LOGGER.info("flushing a memtable to file {} in storage group {}, io cost {}ms",
         writer.getFile().getName(), storageGroup, ioTime);
   };
 
+  static class TaskEnd {
+
+    TaskEnd() {
+
+    }
+  }
+
   static class EndChunkGroupIoTask {
 
     EndChunkGroupIoTask() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index de4c985..085bcef 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -79,4 +79,9 @@ public class PublicBAOS extends ByteArrayOutputStream {
   public void reset() {
     count = 0;
   }
+
+  @Override
+  public int size() {
+    return count;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index a1968c8..0b92c6f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -52,11 +52,6 @@ public class ChunkWriterImpl implements IChunkWriter {
    */
   private PublicBAOS pageBuffer;
 
-  /**
-   * current chunk data size, i.e the size of pageBuffer
-   */
-  private int chunkDataSize;
-
   private int numOfPages;
 
   /**
@@ -356,7 +351,6 @@ public class ChunkWriterImpl implements IChunkWriter {
     if (pageWriter != null && pageWriter.getPointNumber() > 0) {
       writePageToPageBuffer();
     }
-    chunkDataSize = pageBuffer.size();
   }
   
   public void clearPageWriter() {
@@ -424,7 +418,7 @@ public class ChunkWriterImpl implements IChunkWriter {
 
     // start to write this column chunk
     writer.startFlushChunk(measurementSchema, compressor.getType(), measurementSchema.getType(),
-        measurementSchema.getEncodingType(), statistics, chunkDataSize, numOfPages);
+        measurementSchema.getEncodingType(), statistics, pageBuffer.size(), numOfPages);
 
     long dataOffset = writer.getPos();
 
@@ -432,10 +426,10 @@ public class ChunkWriterImpl implements IChunkWriter {
     writer.writeBytesToStream(pageBuffer);
 
     int dataSize = (int) (writer.getPos() - dataOffset);
-    if (dataSize != chunkDataSize) {
+    if (dataSize != pageBuffer.size()) {
       throw new IOException(
           "Bytes written is inconsistent with the size of data: " + dataSize + " !="
-              + " " + chunkDataSize);
+              + " " + pageBuffer.size());
     }
 
     writer.endCurrentChunk();