You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/23 05:04:45 UTC

[iotdb] branch concurrentBug updated (ecb54e02a9 -> 4ff9fb650e)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from ecb54e02a9 merge master
     new 16917f89ea Reduce the scope of lock in MemoryPool
     add 659fa27eaa Update IoTDBDescriptor.java (#9420)
     add 788c58dd7b [IOTDB-5715] Improve the performance of query order by time desc
     add 1b9c9eff30 [IOTDB-5716] Wrong dependency when pipeline consumeOneByOneOperator
     add 09c7fa77c4 remove aligned time series id in nonAligned measurement iterator (#9410)
     new 4ff9fb650e Merge branch 'master' into concurrentBug

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +-
 .../impl/ReadChunkCompactionPerformer.java         |   7 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |   7 +
 .../db/exception/runtime/MemoryLeakException.java  |   7 +-
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |   4 +
 .../execution/exchange/MPPDataExchangeManager.java |   6 +
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  15 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |   8 -
 .../execution/exchange/source/SourceHandle.java    |   8 -
 .../fragment/FragmentInstanceExecution.java        |   5 +
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  |  93 ++++----
 .../process/join/RowBasedTimeJoinOperator.java     |   2 +-
 .../process/join/merge/AscTimeComparator.java      |   5 +
 .../process/join/merge/DescTimeComparator.java     |   5 +
 .../process/join/merge/TimeComparator.java         |   3 +
 .../db/mpp/execution/schedule/DriverScheduler.java |   2 +-
 .../utils/MultiTsFileDeviceIteratorTest.java       | 257 +++++++++++++++++----
 .../db/mpp/plan/plan/PipelineBuilderTest.java      |  96 +++++++-
 18 files changed, 403 insertions(+), 139 deletions(-)
 copy service-rpc/src/main/java/org/apache/iotdb/rpc/ConfigNodeConnectionException.java => server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java (84%)


[iotdb] 01/02: Reduce the scope of lock in MemoryPool

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16917f89ea1736bf0204d06e7b1b67c86dcc481c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 23 13:03:53 2023 +0800

    Reduce the scope of lock in MemoryPool
---
 .../db/exception/runtime/MemoryLeakException.java  | 27 +++++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  4 +
 .../execution/exchange/MPPDataExchangeManager.java |  6 ++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 15 ++--
 .../mpp/execution/exchange/sink/SinkChannel.java   |  8 --
 .../execution/exchange/source/SourceHandle.java    |  8 --
 .../fragment/FragmentInstanceExecution.java        |  5 ++
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 93 +++++++++++-----------
 8 files changed, 92 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java
new file mode 100644
index 0000000000..beb9a83951
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.runtime;
+
+public class MemoryLeakException extends RuntimeException {
+
+  public MemoryLeakException(String message) {
+    super(message);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 793066b94a..24164c4e89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -57,6 +57,10 @@ public class FragmentInstanceId {
     return instanceId;
   }
 
+  public String getFragmentInstanceId() {
+    return fragmentId + "." + instanceId;
+  }
+
   public String toString() {
     return fullId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 501781b408..9a58904c79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -470,6 +470,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     return mppDataExchangeService;
   }
 
+  public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) {
+    localMemoryManager
+        .getQueryPool()
+        .deRegisterFragmentInstanceToQueryMemoryMap(queryId, fragmentInstanceId);
+  }
+
   private synchronized ISinkChannel createLocalSinkChannel(
       TFragmentInstanceId localFragmentInstanceId,
       TFragmentInstanceId remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..e496ac98e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -30,12 +30,11 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 import java.util.LinkedList;
 import java.util.Queue;
 
@@ -91,6 +90,10 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+    localMemoryManager
+        .getQueryPool()
+        .registerPlanNodeIdToQueryMemoryMap(
+            fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId);
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -260,10 +263,6 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -289,10 +288,6 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 5cd28de462..b316e2bbd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -222,10 +222,6 @@ public class SinkChannel implements ISinkChannel {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkListener.onAborted(this);
     aborted = true;
     LOGGER.debug("[EndAbortSinkChannel]");
@@ -249,10 +245,6 @@ public class SinkChannel implements ISinkChannel {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkListener.onFinish(this);
     closed = true;
     LOGGER.debug("[EndCloseSinkChannel]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index b66dd4bce4..5d118d310c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -332,10 +332,6 @@ public class SourceHandle implements ISourceHandle {
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
-      localMemoryManager
-          .getQueryPool()
-          .clearMemoryReservationMap(
-              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -369,10 +365,6 @@ public class SourceHandle implements ISourceHandle {
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
-      localMemoryManager
-          .getQueryPool()
-          .clearMemoryReservationMap(
-              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
       executorService.submit(new SendCloseSinkChannelEventTask());
       currSequenceId = lastSequenceId + 1;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f9c17c9963..b402f0892e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -137,6 +138,10 @@ public class FragmentInstanceExecution {
             context.releaseResource();
             // help for gc
             drivers = null;
+            MPPDataExchangeService.getInstance()
+                .getMPPDataExchangeManager()
+                .deRegisterFragmentInstanceFromMemoryPool(
+                    instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 37dfc13d9a..a9acf90e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -20,19 +20,18 @@
 package org.apache.iotdb.db.mpp.execution.memory;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -149,6 +148,44 @@ public class MemoryPool {
     return id;
   }
 
+  /**
+   * Before executing, we register memory map which is related to queryId, fragmentInstanceId, and
+   * planNodeId to queryMemoryReservationsMap first.
+   */
+  public synchronized void registerPlanNodeIdToQueryMemoryMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    queryMemoryReservations
+        .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
+        .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+        .putIfAbsent(planNodeId, 0L);
+  }
+
+  /**
+   * If all fragmentInstanceIds related to one queryId have been registered, when the last fragment
+   * instance is deregister, the queryId can be cleared.
+   *
+   * <p>If some fragmentInstanceIds have not been registered when queryId is cleared, they will
+   * register queryId again with lock, so there is no concurrency problem.
+   */
+  public void deRegisterFragmentInstanceToQueryMemoryMap(
+      String queryId, String fragmentInstanceId) {
+    Map<String, Long> planNodeRelatedMemory =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    for (Long memoryReserved : planNodeRelatedMemory.values()) {
+      if (memoryReserved != 0) {
+        throw new MemoryLeakException(
+            "PlanNode related memory is not zero when deregister fragment instance from query memory pool.");
+      }
+    }
+    synchronized (queryMemoryReservations) {
+      Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
+      queryRelatedMemory.remove(fragmentInstanceId);
+      if (queryRelatedMemory.isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
+
   /**
    * Reserve memory with bytesToReserve.
    *
@@ -338,47 +375,7 @@ public class MemoryPool {
     return maxBytes - remainingBytes.get();
   }
 
-  public void clearMemoryReservationMap(
-      String queryId, String fragmentInstanceId, String planNodeId) {
-    Map<String, Map<String, Long>> instanceBytesReserved = queryMemoryReservations.get(queryId);
-    Map<String, Long> planNodeIdToBytesReserved =
-        queryMemoryReservations
-            .getOrDefault(queryId, Collections.emptyMap())
-            .get(fragmentInstanceId);
-    if (instanceBytesReserved == null || planNodeIdToBytesReserved == null) {
-      return;
-    }
-
-    Long newValue =
-        planNodeIdToBytesReserved.computeIfPresent(
-            planNodeId,
-            (k, memoryReserved) -> {
-              if (memoryReserved == 0) {
-                return null;
-              }
-              return memoryReserved;
-            });
-    if (newValue == null) {
-      instanceBytesReserved.computeIfPresent(
-          fragmentInstanceId,
-          (k, kPlanNodeBytesReserved) -> {
-            if (kPlanNodeBytesReserved.isEmpty()) {
-              return null;
-            }
-            return kPlanNodeBytesReserved;
-          });
-      queryMemoryReservations.computeIfPresent(
-          queryId,
-          (k, kInstanceBytesReserved) -> {
-            if (kInstanceBytesReserved.isEmpty()) {
-              return null;
-            }
-            return kInstanceBytesReserved;
-          });
-    }
-  }
-
-  private boolean tryReserve(
+  public boolean tryReserve(
       String queryId,
       String fragmentInstanceId,
       String planNodeId,
@@ -388,8 +385,8 @@ public class MemoryPool {
     long queryRemainingBytes =
         maxBytesCanReserve
             - queryMemoryReservations
-                .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
-                .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+                .get(queryId)
+                .get(fragmentInstanceId)
                 .merge(planNodeId, bytesToReserve, Long::sum);
     return tryRemainingBytes >= 0 && queryRemainingBytes >= 0;
   }
@@ -397,8 +394,8 @@ public class MemoryPool {
   private void rollbackReserve(
       String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve) {
     queryMemoryReservations
-        .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
-        .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+        .get(queryId)
+        .get(fragmentInstanceId)
         .merge(planNodeId, -bytesToReserve, Long::sum);
     remainingBytes.addAndGet(bytesToReserve);
   }


[iotdb] 02/02: Merge branch 'master' into concurrentBug

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ff9fb650efec47b5b5b2ae5cc2340465db28779
Merge: 16917f89ea 09c7fa77c4
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 23 13:04:28 2023 +0800

    Merge branch 'master' into concurrentBug

 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +-
 .../impl/ReadChunkCompactionPerformer.java         |   7 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |   7 +
 .../process/join/RowBasedTimeJoinOperator.java     |   2 +-
 .../process/join/merge/AscTimeComparator.java      |   5 +
 .../process/join/merge/DescTimeComparator.java     |   5 +
 .../process/join/merge/TimeComparator.java         |   3 +
 .../db/mpp/execution/schedule/DriverScheduler.java |   2 +-
 .../utils/MultiTsFileDeviceIteratorTest.java       | 257 +++++++++++++++++----
 .../db/mpp/plan/plan/PipelineBuilderTest.java      |  96 +++++++-
 10 files changed, 334 insertions(+), 62 deletions(-)