You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/15 17:54:53 UTC

[iotdb] branch encoding_parallel updated: run success

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/encoding_parallel by this push:
     new 2526b3b  run success
2526b3b is described below

commit 2526b3b734a3c7d4d0fccb61655f9dac390007a4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Mar 16 01:54:18 2021 +0800

    run success
---
 .../engine/flush/MultiThreadMemTableFlushTask.java | 33 +++++++++++-----------
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  5 ++++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  2 ++
 .../db/engine/flush/MemTableFlushTaskTest.java     |  8 +++++-
 4 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
index fd3174b..46a04df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -56,20 +56,8 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
 
   int threadSize = 10;
 
-  private final LinkedBlockingQueue<Object>[] encodingTaskQueues =
-      new LinkedBlockingQueue[threadSize];
-  private LinkedBlockingQueue<Object>[] ioTaskQueues =
-      new LinkedBlockingQueue[threadSize]; // this initialization may be wasted.
-
-  {
-    for (int i = 0; i < threadSize; i++) {
-      ioTaskQueues[i] =
-          config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()
-              ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
-              : new LinkedBlockingQueue<>();
-      encodingTaskQueues[i] = new LinkedBlockingQueue<>();
-    }
-  }
+  private final LinkedBlockingQueue<Object>[] encodingTaskQueues;
+  private LinkedBlockingQueue<Object>[] ioTaskQueues;
 
   private String storageGroup;
 
@@ -88,6 +76,19 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
     this.memTable = memTable;
     this.writer = writer;
     this.storageGroup = storageGroup;
+    if (threadSize > memTable.getChunkGroupNumber()) {
+      threadSize = memTable.getChunkGroupNumber();
+    }
+    this.encodingTaskQueues = new LinkedBlockingQueue[threadSize];
+    ioTaskQueues = new LinkedBlockingQueue[threadSize];
+    for (int i = 0; i < threadSize; i++) {
+      ioTaskQueues[i] =
+          config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()
+              ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+              : new LinkedBlockingQueue<>();
+      encodingTaskQueues[i] = new LinkedBlockingQueue<>();
+    }
+
     this.encodingTaskFutures = submitEncodingTasks();
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
     LOGGER.debug(
@@ -372,7 +373,7 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
                 continue;
               }
               for (j = 0; j < threadSize % Byte.SIZE; j++) {
-                if ((finished[threadSize / Byte.SIZE] & BIT_UTIL[j]) != 1) {
+                if ((finished[threadSize / Byte.SIZE] & BIT_UTIL[j]) == 0) {
                   // not finished.
                   break;
                 }
@@ -424,5 +425,5 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
     }
   }
 
-  private static final short[] BIT_UTIL = new short[] {1, 2, 4, 8, 16, 32, 64, 0XFF};
+  private static final byte[] BIT_UTIL = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 21c9bf2..9de3f23 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -208,6 +208,11 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   @Override
+  public int getChunkGroupNumber() {
+    return memTableMap.size();
+  }
+
+  @Override
   public long getTotalPointsNum() {
     return totalPointsNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce412a2..1cc7b2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -76,6 +76,8 @@ public interface IMemTable {
 
   int getSeriesNumber();
 
+  int getChunkGroupNumber();
+
   long getTotalPointsNum();
 
   void insert(InsertRowPlan insertRowPlan);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
index 387daef..9a65600 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
@@ -107,11 +107,17 @@ public class MemTableFlushTaskTest {
     }
   }
 
+  // @Test
+  public void test2() {
+    byte a = -128;
+    a &= 0XFF;
+    System.out.println(a);
+  }
+
   @Test
   public void test() throws ExecutionException, InterruptedException, IOException {
     IMemTableFlushTask task = new MultiThreadMemTableFlushTask(memTable, writer, "root.sg");
     task.syncFlushMemTable();
-    System.out.println("end file.....");
     writer.endFile();
     writer.close();