You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/07 04:12:27 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: change flushTask as 3 parts (using 3 threads)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 8c4d79f  change flushTask as 3 parts (using 3 threads)
8c4d79f is described below

commit 8c4d79fb3a58cf3d9fafffe5be16ada8d4c26c86
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 7 12:12:16 2019 +0800

    change flushTask as 3 parts (using 3 threads)
---
 .../engine/bufferwrite/BufferWriteProcessor.java   |   5 +-
 .../db/engine/memtable/MemTableFlushTask.java      | 228 +++++++++++++++++++++
 .../db/engine/memtable/MemTableFlushUtil.java      |   3 -
 .../bufferwrite/RestorableTsFileIOWriterTest.java  |  10 +-
 4 files changed, 239 insertions(+), 7 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 2a458fd..03b6c9e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.pool.FlushManager;
@@ -317,8 +318,8 @@ public class BufferWriteProcessor extends Processor {
     try {
       if (flushMemTable != null && !flushMemTable.isEmpty()) {
         // flush data
-        MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
-            version);
+        MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName());
+        tableFlushTask.flushMemTable(fileSchema, flushMemTable, version);
         // write restore information
         writer.flush();
       }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
new file mode 100644
index 0000000..7b28992
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.memtable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemTableFlushTask {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
+  private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
+
+  private TsFileIOWriter tsFileIoWriter;
+
+  private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
+  private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
+  private boolean stop = false;
+  private String processName;
+
+  //the position of the tsfile when a flush task begins.
+  long startPos;
+
+  //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
+  // rather than per each memtable.
+  private Thread ioFlushThread = new Thread(() -> {
+    long ioTime = 0;
+    while (!stop) {
+      Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+      if (seriesWriterOrEndChunkGroupTask == null ) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+        }
+      } else {
+        long starTime = System.currentTimeMillis();
+        try {
+          if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+            ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+          } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+            tsFileIoWriter.startFlushChunkGroup((String)seriesWriterOrEndChunkGroupTask);
+          } else {
+            ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+            long memSize = tsFileIoWriter.getPos() - startPos;
+            ChunkGroupFooter footer = new ChunkGroupFooter(task.deviceId, memSize, task.seriesNumber);
+            tsFileIoWriter.endChunkGroup(footer, task.version);
+            startPos = tsFileIoWriter.getPos();
+            task.finished = true;
+          }
+        } catch (IOException e) {
+          LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+          throw new RuntimeException(e);
+        }
+        ioTime += System.currentTimeMillis() - starTime;
+      }
+    }
+    LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk:  io cost {} ms.", processName, ioTime );
+  });
+
+  private Thread memoryFlushThread = new Thread(() -> {
+    long memSerializeTime = 0;
+    while (!stop) {
+      Object task = memoryTaskQueue.poll();
+      if (task == null ) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+        }
+      } else {
+        if (task instanceof  String) {
+          ioTaskQueue.add(task);
+        } else if (task instanceof ChunkGroupIoTask) {
+          ioTaskQueue.add(task);
+        }else {
+          long starTime = System.currentTimeMillis();
+          Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+          ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
+          IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
+              PAGE_SIZE_THRESHOLD);
+          try {
+            writeOneSeries(memorySerializeTask.left, seriesWriter,
+                memorySerializeTask.right.getType());
+            ioTaskQueue.add(seriesWriter);
+          } catch (IOException e) {
+            LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+            throw new RuntimeException(e);
+          }
+          memSerializeTime += System.currentTimeMillis() - starTime;
+        }
+      }
+    }
+    LOGGER.info("BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.", processName, memSerializeTime );
+  });
+
+  public MemTableFlushTask(TsFileIOWriter writer, String processName) {
+    this.tsFileIoWriter = writer;
+    this.processName = processName;
+    ioFlushThread.start();
+    memoryFlushThread.start();
+  }
+
+  private  void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+      TSDataType dataType)
+      throws IOException {
+    for (TimeValuePair timeValuePair : tvPairs) {
+      switch (dataType) {
+        case BOOLEAN:
+            seriesWriterImpl
+                    .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+          break;
+        case INT32:
+            seriesWriterImpl.write(timeValuePair.getTimestamp(),
+                    timeValuePair.getValue().getInt());
+          break;
+        case INT64:
+            seriesWriterImpl.write(timeValuePair.getTimestamp(),
+                    timeValuePair.getValue().getLong());
+          break;
+        case FLOAT:
+            seriesWriterImpl.write(timeValuePair.getTimestamp(),
+                    timeValuePair.getValue().getFloat());
+          break;
+        case DOUBLE:
+            seriesWriterImpl
+                    .write(timeValuePair.getTimestamp(),
+                            timeValuePair.getValue().getDouble());
+          break;
+        case TEXT:
+            seriesWriterImpl
+                    .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+          break;
+        default:
+          LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName, dataType);
+          break;
+      }
+    }
+  }
+
+  /**
+   * the function for flushing memtable.
+   */
+  public  void flushMemTable(FileSchema fileSchema,
+      IMemTable imemTable, long version) throws IOException {
+    long sortTime = 0;
+    startPos = tsFileIoWriter.getPos();
+    ChunkGroupIoTask theLastTask = EMPTY_TASK;
+    for (String deviceId : imemTable.getMemTableMap().keySet()) {
+      memoryTaskQueue.add(deviceId);
+      int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
+      for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
+        long startTime = System.currentTimeMillis();
+        // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
+        IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
+        MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
+        List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
+        sortTime += System.currentTimeMillis() - startTime;
+        memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+      }
+      theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, version);
+      memoryTaskQueue.add(theLastTask);
+    }
+    LOGGER.info(
+        "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.", processName, sortTime );
+    while (!theLastTask.finished) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.", processName, e);
+        throw new RuntimeException(e);
+      }
+    }
+    stop = true;
+  }
+
+
+  static class ChunkGroupIoTask {
+    int seriesNumber;
+    String deviceId;
+    long version;
+    boolean finished;
+
+    public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
+      this(seriesNumber, deviceId, version, false);
+    }
+    public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
+      this.seriesNumber = seriesNumber;
+      this.deviceId = deviceId;
+      this.version = version;
+      this.finished = finished;
+    }
+  }
+  private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 8cd0ec7..4aa12e9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -19,15 +19,12 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 
-import java.util.Map;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index f2c393a..1f8e3e2 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -31,6 +31,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 import org.apache.iotdb.db.engine.memtable.MemTableTestUtils;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -165,7 +166,9 @@ public class RestorableTsFileIOWriterTest {
     memTable.write("d1", "s2", TSDataType.INT32, 3, "1");
     memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
     memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
-    MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
+    //MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
+    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+    tableFlushTask.flushMemTable(schema, memTable, 0);
     writer.flush();
     writer.appendMetadata();
     writer.getOutput().close();
@@ -218,7 +221,10 @@ public class RestorableTsFileIOWriterTest {
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
 
-    MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
+    //MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
+    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+    tableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable, 0);
+
     writer.flush();
 
     assertEquals(0,