You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/26 05:02:04 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new db029339547 [To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool
db029339547 is described below

commit db029339547867f1fb481ae0375f7f775fa42443
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri May 26 13:01:57 2023 +0800

    [To rel/1.1] Fix potential deadlock when freeing memory in MemoryPool
---
 .../src/assembly/resources/conf/iotdb-common.properties  |  5 +++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  |  5 +++--
 .../mpp/execution/exchange/MPPDataExchangeManager.java   | 10 +++++++---
 .../db/mpp/execution/exchange/SharedTsBlockQueue.java    | 16 +++++++++++++---
 .../db/mpp/execution/exchange/LocalSinkChannelTest.java  | 14 ++++++++++++--
 .../db/mpp/execution/exchange/LocalSourceHandleTest.java | 14 ++++++++++++--
 .../mpp/execution/exchange/SharedTsBlockQueueTest.java   |  7 ++++++-
 7 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 603016c6bfc..75f1c972fb5 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -427,9 +427,10 @@ cluster_name=defaultCluster
 # Datatype: int
 # query_thread_count=0
 
-# How many pipeline drivers will be created for one fragment instance. When <= 0, use CPU core number / 2.
+# How many pipeline drivers will be created for one fragment instance. Default dop = 1 means FI will not be further split.
+# CPU core number / 2 could be a choice.
 # Datatype: int
-# degree_of_query_parallelism=0
+# degree_of_query_parallelism=1
 
 # The amount of data iterate each time in server (the number of data strips, that is, the number of different timestamps.)
 # Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ee10b7eec50..d9484295f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -324,7 +324,8 @@ public class IoTDBConfig {
   /** How many threads can concurrently execute query statement. When <= 0, use CPU core number. */
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
-  private int degreeOfParallelism = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+  /** default dop = 1 for now */
+  private int degreeOfParallelism = 1;
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
   private int maxAllowedConcurrentQueries = 1000;
@@ -1543,7 +1544,7 @@ public class IoTDBConfig {
   }
 
   public void setDegreeOfParallelism(int degreeOfParallelism) {
-    this.degreeOfParallelism = degreeOfParallelism;
+    this.degreeOfParallelism = Math.max(1, degreeOfParallelism);
   }
 
   public int getDegreeOfParallelism() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 1e25adb71cd..886205d1306 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -568,7 +568,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       queue = localSourceHandle.getSharedTsBlockQueue();
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
-      queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
+      queue =
+          new SharedTsBlockQueue(
+              localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService);
     }
 
     return new LocalSinkChannel(
@@ -591,7 +593,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
-            localMemoryManager);
+            localMemoryManager,
+            executorService);
     queue.allowAddingTsBlock();
     return new LocalSinkChannel(
         queue,
@@ -747,7 +750,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
       queue =
-          new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager);
+          new SharedTsBlockQueue(
+              remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService);
     }
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index f5f1884f690..6566d767d43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -38,9 +38,9 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 /** This is not thread safe class, the caller should ensure multi-threads safety. */
 @NotThreadSafe
@@ -80,10 +80,14 @@ public class SharedTsBlockQueue {
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  // used for SharedTsBlockQueue listener
+  private final ExecutorService executorService;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId,
       String planNodeId,
-      LocalMemoryManager localMemoryManager) {
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
     this.fullFragmentInstanceId =
@@ -91,6 +95,7 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+    this.executorService = Validate.notNull(executorService, "ExecutorService can not be null.");
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -228,7 +233,12 @@ public class SharedTsBlockQueue {
               }
             }
           },
-          directExecutor());
+          // Use directExecutor() here could lead to deadlock. Thread A holds lock of
+          // SharedTsBlockQueueA and tries to invoke the listener of
+          // SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while
+          // Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of
+          // SharedTsBlockQueueA
+          executorService);
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
       if (!blocked.isDone()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d85..694edfc6b6e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class LocalSinkChannelTest {
   @Test
   public void testSend() {
@@ -50,7 +52,11 @@ public class LocalSinkChannelTest {
     SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
     // Construct a shared TsBlock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     // Construct Sink.
     LocalSinkChannel localSinkChannel =
@@ -137,7 +143,11 @@ public class LocalSinkChannelTest {
     SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
     // Construct a shared tsblock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     // Construct SinkChannel.
     LocalSinkChannel localSinkChannel =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f199676..db57cd0c329 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class LocalSourceHandleTest {
   @Test
   public void testReceive() {
@@ -47,7 +49,11 @@ public class LocalSourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a shared TsBlock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
@@ -91,7 +97,11 @@ public class LocalSourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a shared tsblock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
index e5336d2ac2a..9c3399a6e17 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class SharedTsBlockQueueTest {
   @Test(timeout = 5000L)
   public void concurrencyTest() {
@@ -46,7 +48,10 @@ public class SharedTsBlockQueueTest {
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
-            new TFragmentInstanceId(queryId, 0, "0"), "test", mockLocalMemoryManager);
+            new TFragmentInstanceId(queryId, 0, "0"),
+            "test",
+            mockLocalMemoryManager,
+            newDirectExecutorService());
     queue.getCanAddTsBlock().set(null);
     queue.setMaxBytesCanReserve(Long.MAX_VALUE);