You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/26 05:02:04 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new db029339547 [To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool
db029339547 is described below
commit db029339547867f1fb481ae0375f7f775fa42443
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri May 26 13:01:57 2023 +0800
[To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool
---
.../src/assembly/resources/conf/iotdb-common.properties | 5 +++--
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 5 +++--
.../mpp/execution/exchange/MPPDataExchangeManager.java | 10 +++++++---
.../db/mpp/execution/exchange/SharedTsBlockQueue.java | 16 +++++++++++++---
.../db/mpp/execution/exchange/LocalSinkChannelTest.java | 14 ++++++++++++--
.../db/mpp/execution/exchange/LocalSourceHandleTest.java | 14 ++++++++++++--
.../mpp/execution/exchange/SharedTsBlockQueueTest.java | 7 ++++++-
7 files changed, 56 insertions(+), 15 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 603016c6bfc..75f1c972fb5 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -427,9 +427,10 @@ cluster_name=defaultCluster
# Datatype: int
# query_thread_count=0
-# How many pipeline drivers will be created for one fragment instance. When <= 0, use CPU core number / 2.
+# How many pipeline drivers will be created for one fragment instance. Default dop = 1 means FI will not be further split.
+# CPU core number / 2 could be a choice.
# Datatype: int
-# degree_of_query_parallelism=0
+# degree_of_query_parallelism=1
# The amount of data iterate each time in server (the number of data strips, that is, the number of different timestamps.)
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ee10b7eec50..d9484295f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -324,7 +324,8 @@ public class IoTDBConfig {
/** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
private int queryThreadCount = Runtime.getRuntime().availableProcessors();
- private int degreeOfParallelism = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+ /** default dop = 1 for now */
+ private int degreeOfParallelism = 1;
/** How many queries can be concurrently executed. When <= 0, use 1000. */
private int maxAllowedConcurrentQueries = 1000;
@@ -1543,7 +1544,7 @@ public class IoTDBConfig {
}
public void setDegreeOfParallelism(int degreeOfParallelism) {
- this.degreeOfParallelism = degreeOfParallelism;
+ this.degreeOfParallelism = Math.max(1, degreeOfParallelism);
}
public int getDegreeOfParallelism() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 1e25adb71cd..886205d1306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -568,7 +568,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
queue = localSourceHandle.getSharedTsBlockQueue();
} else {
LOGGER.debug("Create SharedTsBlockQueue");
- queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
+ queue =
+ new SharedTsBlockQueue(
+ localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService);
}
return new LocalSinkChannel(
@@ -591,7 +593,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
new SharedTsBlockQueue(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
- localMemoryManager);
+ localMemoryManager,
+ executorService);
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
@@ -747,7 +750,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
} else {
LOGGER.debug("Create SharedTsBlockQueue");
queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index f5f1884f690..6566d767d43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -38,9 +38,9 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
/** This is not thread safe class, the caller should ensure multi-threads safety. */
@NotThreadSafe
@@ -80,10 +80,14 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ // used for SharedTsBlockQueue listener
+ private final ExecutorService executorService;
+
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
- LocalMemoryManager localMemoryManager) {
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
this.fullFragmentInstanceId =
@@ -91,6 +95,7 @@ public class SharedTsBlockQueue {
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+ this.executorService = Validate.notNull(executorService, "ExecutorService can not be null.");
}
public boolean hasNoMoreTsBlocks() {
@@ -228,7 +233,12 @@ public class SharedTsBlockQueue {
}
}
},
- directExecutor());
+ // Use directExecutor() here could lead to deadlock. Thread A holds lock of
+ // SharedTsBlockQueueA and tries to invoke the listener of
+ // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while
+ // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of
+ // SharedTsBlockQueueA
+ executorService);
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d85..694edfc6b6e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class LocalSinkChannelTest {
@Test
public void testSend() {
@@ -50,7 +52,11 @@ public class LocalSinkChannelTest {
SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
// Construct a shared TsBlock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
// Construct Sink.
LocalSinkChannel localSinkChannel =
@@ -137,7 +143,11 @@ public class LocalSinkChannelTest {
SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
// Construct a shared tsblock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
// Construct SinkChannel.
LocalSinkChannel localSinkChannel =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f199676..db57cd0c329 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class LocalSourceHandleTest {
@Test
public void testReceive() {
@@ -47,7 +49,11 @@ public class LocalSourceHandleTest {
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a shared TsBlock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
@@ -91,7 +97,11 @@ public class LocalSourceHandleTest {
SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
// Construct a shared tsblock queue.
SharedTsBlockQueue queue =
- new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, mockLocalMemoryManager);
+ new SharedTsBlockQueue(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockLocalMemoryManager,
+ newDirectExecutorService());
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
index e5336d2ac2a..9c3399a6e17 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
public class SharedTsBlockQueueTest {
@Test(timeout = 5000L)
public void concurrencyTest() {
@@ -46,7 +48,10 @@ public class SharedTsBlockQueueTest {
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
- new TFragmentInstanceId(queryId, 0, "0"), "test", mockLocalMemoryManager);
+ new TFragmentInstanceId(queryId, 0, "0"),
+ "test",
+ mockLocalMemoryManager,
+ newDirectExecutorService());
queue.getCanAddTsBlock().set(null);
queue.setMaxBytesCanReserve(Long.MAX_VALUE);