You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/15 17:54:53 UTC
[iotdb] branch encoding_parallel updated: run success
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch encoding_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/encoding_parallel by this push:
new 2526b3b run success
2526b3b is described below
commit 2526b3b734a3c7d4d0fccb61655f9dac390007a4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Mar 16 01:54:18 2021 +0800
run success
---
.../engine/flush/MultiThreadMemTableFlushTask.java | 33 +++++++++++-----------
.../iotdb/db/engine/memtable/AbstractMemTable.java | 5 ++++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 ++
.../db/engine/flush/MemTableFlushTaskTest.java | 8 +++++-
4 files changed, 31 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
index fd3174b..46a04df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MultiThreadMemTableFlushTask.java
@@ -56,20 +56,8 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
int threadSize = 10;
- private final LinkedBlockingQueue<Object>[] encodingTaskQueues =
- new LinkedBlockingQueue[threadSize];
- private LinkedBlockingQueue<Object>[] ioTaskQueues =
- new LinkedBlockingQueue[threadSize]; // this initialization may be wasted.
-
- {
- for (int i = 0; i < threadSize; i++) {
- ioTaskQueues[i] =
- config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()
- ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
- : new LinkedBlockingQueue<>();
- encodingTaskQueues[i] = new LinkedBlockingQueue<>();
- }
- }
+ private final LinkedBlockingQueue<Object>[] encodingTaskQueues;
+ private LinkedBlockingQueue<Object>[] ioTaskQueues;
private String storageGroup;
@@ -88,6 +76,19 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
this.memTable = memTable;
this.writer = writer;
this.storageGroup = storageGroup;
+ if (threadSize > memTable.getChunkGroupNumber()) {
+ threadSize = memTable.getChunkGroupNumber();
+ }
+ this.encodingTaskQueues = new LinkedBlockingQueue[threadSize];
+ ioTaskQueues = new LinkedBlockingQueue[threadSize];
+ for (int i = 0; i < threadSize; i++) {
+ ioTaskQueues[i] =
+ config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()
+ ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
+ : new LinkedBlockingQueue<>();
+ encodingTaskQueues[i] = new LinkedBlockingQueue<>();
+ }
+
this.encodingTaskFutures = submitEncodingTasks();
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
LOGGER.debug(
@@ -372,7 +373,7 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
continue;
}
for (j = 0; j < threadSize % Byte.SIZE; j++) {
- if ((finished[threadSize / Byte.SIZE] & BIT_UTIL[j]) != 1) {
+ if ((finished[threadSize / Byte.SIZE] & BIT_UTIL[j]) == 0) {
// not finished.
break;
}
@@ -424,5 +425,5 @@ public class MultiThreadMemTableFlushTask implements IMemTableFlushTask {
}
}
- private static final short[] BIT_UTIL = new short[] {1, 2, 4, 8, 16, 32, 64, 0XFF};
+ private static final byte[] BIT_UTIL = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 21c9bf2..9de3f23 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -208,6 +208,11 @@ public abstract class AbstractMemTable implements IMemTable {
}
@Override
+ public int getChunkGroupNumber() {
+ return memTableMap.size();
+ }
+
+ @Override
public long getTotalPointsNum() {
return totalPointsNum;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce412a2..1cc7b2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -76,6 +76,8 @@ public interface IMemTable {
int getSeriesNumber();
+ int getChunkGroupNumber();
+
long getTotalPointsNum();
void insert(InsertRowPlan insertRowPlan);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
index 387daef..9a65600 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskTest.java
@@ -107,11 +107,17 @@ public class MemTableFlushTaskTest {
}
}
+ // @Test
+ public void test2() {
+ byte a = -128;
+ a &= 0XFF;
+ System.out.println(a);
+ }
+
@Test
public void test() throws ExecutionException, InterruptedException, IOException {
IMemTableFlushTask task = new MultiThreadMemTableFlushTask(memTable, writer, "root.sg");
task.syncFlushMemTable();
- System.out.println("end file.....");
writer.endFile();
writer.close();